# Mimid : Inferring Grammars* Code for subjects [here](#Our-subject-programs)* Evaluation starts [here](#Evaluation) * The evaluation on specific subjects starts [here](#Subjects) * [CGIDecode](#CGIDecode) * [Calculator](#Calculator) * [MathExpr](#MathExpr) * [URLParse](#URLParse) * [Netrc](#Netrc) * [Microjson](#Microjson)* Results are [here](#Results)* Recovering parse tree from a recognizer is [here](#Using-a-Recognizer-(not-a-Parser))* Recovering parse tree from parser combinators is [here](#Parsing-with-Parser-Combinators)* Recovering parse tree from PEG parer is [here](#Parsing-with-PEG-Parser)Please note that a complete run can take an hour and a half to complete.Please note that a complete run can take an hour and a half to complete.
We start with a few Jupyter magics that let us specify examples inline, that can be turned off if needed for faster execution. Switch `TOP to False` if you do not want examples to complete.We start with a few Jupyter magics that let us specify examples inline, that can be turned off if needed for faster execution. Switch TOP to False if you do not want examples to complete.
xxxxxxxxxxTOP = __name__ == '__main__'The magics we use are `%%var` and `%top`. The `%%var` lets us specify large strings such as file contents directly without too many escapes. The `%top` helps with examples.The magics we use are %%var and %top. The %%var lets us specify large strings such as file contents directly without too many escapes. The %top helps with examples.
xxxxxxxxxxfrom IPython.core.magic import (Magics, magics_class, cell_magic, line_magic, line_cell_magic)class B(dict): def __getattr__(self, name): return self.__getitem__(name)class MyMagics(Magics): def __init__(self, shell=None, **kwargs): super().__init__(shell=shell, **kwargs) self._vars = B() shell.user_ns['VARS'] = self._vars def var(self, line, cell): self._vars[line.strip()] = cell.strip() def top(self, line, cell=None): if TOP: if cell is None: cell = line ip = get_ipython() res = ip.run_cell(cell)get_ipython().register_magics(MyMagics)xxxxxxxxxximport sysParts of the program, especially the subprocess execution using `do()` requires the new flags in `3.7`. I am not sure if the taints will work on anything above.Parts of the program, especially the subprocess execution using do() requires the new flags in 3.7. I am not sure if the taints will work on anything above.
xxxxxxxxxx%top assert sys.version_info[0:2] == (3, 7)xxxxxxxxxxfrom subprocess import runxxxxxxxxxximport osWe keep a log of all system commands executed for easier debugging at `./build/do.log`.We keep a log of all system commands executed for easier debugging at ./build/do.log.
xxxxxxxxxxdef do(command, env=None, shell=False, log=False, **args): result = run(command, universal_newlines=True, shell=shell, env=dict(os.environ, **({} if env is None else env)), capture_output=True, **args) if log: with open('build/do.log', 'a+') as f: print(json.dumps({'cmd':command, 'env':env, 'exitcode':result.returncode}), env, file=f) return resultxxxxxxxxxximport randomTry to ensure replicability of measurements.Try to ensure replicability of measurements.
xxxxxxxxxxrandom.seed(0)Note that this notebook was tested on `Debian GNU/Linux 8.10 and 9.9` and `MacOS Mojave 10.14.5`. In particular, I do not know if everything will work on `Windows`.Note that this notebook was tested on Debian GNU/Linux 8.10 and 9.9 and MacOS Mojave 10.14.5. In particular, I do not know if everything will work on Windows.
xxxxxxxxxximport shutilxxxxxxxxxx%%topif shutil.which('lsb_release'): res = do(['lsb_release', '-d']).stdoutelif shutil.which('sw_vers'): res = do(['sw_vers']).stdoutelse: assert Falseresxxxxxxxxxx%top do(['jupyter', '--version']).stdoutOur code is based on the utilities provided by the [Fuzzingbook](http://fuzzingbook.org). Note that the measurements on time and precision in paper were based on Fuzzingbook `0.0.7`. During the development, we found a few bugs in Autogram, which we communicated back, which resulted in a new version of Fuzzingbook `0.8.0`.The fixed *Autogram* implementation of the *Fuzzingbook* has better precision rates for *Autogram*, and timing for grammar generation. However, these numbers still fall short of *Mimid* for most grammars. Further, the grammars generated by *Autogram* are still enumerative. That is, rather than producing a context free grammar, it simply appends input strings as alternates to the `<START>` nonterminal. This again results in bad recall numbers as before. Hence, it does not change our main points. During the remainder of this notebook, we use the `0.8.0` version of the Fuzzingbook.Our code is based on the utilities provided by the Fuzzingbook. Note that the measurements on time and precision in paper were based on Fuzzingbook 0.0.7. During the development, we found a few bugs in Autogram, which we communicated back, which resulted in a new version of Fuzzingbook 0.8.0.
The fixed Autogram implementation of the Fuzzingbook has better precision rates for Autogram, and timing for grammar generation. However, these numbers still fall short of Mimid for most grammars. Further, the grammars generated by Autogram are still enumerative. That is, rather than producing a context free grammar, it simply appends input strings as alternates to the <START> nonterminal. This again results in bad recall numbers as before. Hence, it does not change our main points. During the remainder of this notebook, we use the 0.8.0 version of the Fuzzingbook.
First we define `pip_install()`, a helper to silently install required dependencies.First we define pip_install(), a helper to silently install required dependencies.
xxxxxxxxxxdef pip_install(v): return do(['pip', 'install', '-qqq', *v.split(' ')]).returncodexxxxxxxxxx%top pip_install('fuzzingbook==0.8.0')Our external dependencies other than `fuzzingbook` are as follows.Our external dependencies other than fuzzingbook are as follows.
xxxxxxxxxx%top pip_install('astor graphviz scipy')**IMPORTANT:** Restart the jupyter server after installation of dependencies and extensions.IMPORTANT: Restart the jupyter server after installation of dependencies and extensions.
We recommend the following jupyter notebook extensions:We recommend the following jupyter notebook extensions:
xxxxxxxxxx%top pip_install('jupyter_contrib_nbextensions jupyter_nbextensions_configurator')xxxxxxxxxx%top do(['jupyter','contrib','nbextension','install','--user']).returncodexxxxxxxxxxdef nb_enable(v): return do(['jupyter','nbextension','enable',v]).returncodexxxxxxxxxx%top do(['jupyter','nbextensions_configurator','enable','--user']).returncode#### Table of contentsPlease install this extension. The navigation in the notebook is rather hard without this installed.Please install this extension. The navigation in the notebook is rather hard without this installed.
xxxxxxxxxx%top nb_enable('toc2/main')#### Collapsible headingsAgain, do install this extension. This will let you fold away those sections that you do not have an immediate interest in.Again, do install this extension. This will let you fold away those sections that you do not have an immediate interest in.
xxxxxxxxxx%top nb_enable('collapsible_headings/main')#### Execute timeThis is not strictly necessary, but can provide a better breakdown than `timeit` that we use for timing.This is not strictly necessary, but can provide a better breakdown than timeit that we use for timing.
xxxxxxxxxx%top nb_enable('execute_time/ExecuteTime')#### Code foldingVery helpful for hiding away source contents of libraries that are not for grammar recovery.Very helpful for hiding away source contents of libraries that are not for grammar recovery.
xxxxxxxxxx%top nb_enable('codefolding/main')To make runs faster, we cache quite a lot of things. Remove `build` if you change code or samples.To make runs faster, we cache quite a lot of things. Remove build if you change code or samples.
xxxxxxxxxx%top do(['rm', '-rf','build']).returncodeAs we mentioned before `%%var` defines a multi line embedded string that is accessible from Python.As we mentioned before %%var defines a multi line embedded string that is accessible from Python.
xxxxxxxxxx%%var Mimid# [(Testing Mimid# )]xxxxxxxxxx%top VARS['Mimid']Note that our taint tracking implementation is incomplete in that only some of the functions in Python are proxied to preserve taints. Hence, we modify source slightly where necessary to use the proxied functions without affecting the evaluation of the grammar inferencing algorithm.Note that our taint tracking implementation is incomplete in that only some of the functions in Python are proxied to preserve taints. Hence, we modify source slightly where necessary to use the proxied functions without affecting the evaluation of the grammar inferencing algorithm.
### Calculator.pyThis is a really simple calculator written in text book recursive descent style. Note that I have used `list()` in a few places to help out with taint tracking. This is due to the limitations of my taint tracking prototype. It can be fixed if required by simple AST walkers or better taint trackers.This is a really simple calculator written in text book recursive descent style. Note that I have used list() in a few places to help out with taint tracking. This is due to the limitations of my taint tracking prototype. It can be fixed if required by simple AST walkers or better taint trackers.
xxxxxxxxxx%%var calc_src### Mathexpr.pyOriginally from [here]( https://github.com/louisfisch/mathematical-expressions-parser). The mathexpr is much more complicated than our `calculator` and supports advanced functionalities such as predefined functions and variables.xxxxxxxxxx%%var mathexpr_src### Microjson.pyThe microjson is a complete pure python implementation of JSON parser, that was obtained from from [here](https://github.com/phensley/microjson). Note that we use `myio` which is an instrumented version of the original `io` to preserve taints.xxxxxxxxxx%%var microjson_src### URLParse.pyThis is the URL parser that is part of the Python distribution. The source was obtained from [here](https://github.com/python/cpython/blob/3.6/Lib/urllib/parse.py).xxxxxxxxxx%%var urlparse_src### Netrc.pyNetrc is the initialization file that is read by web-agents such as CURL. Python ships the netrc library in its standard distribution. This was taken from [here](https://github.com/python/cpython/blob/3.6/Lib/netrc.py). Note that we use `mylex` and `myio` which corresponds to `shlex` and `io` from Python distribution, but instrumented to preserve taints.Netrc is the initialization file that is read by web-agents such as CURL. Python ships the netrc library in its standard distribution. This was taken from here. Note that we use mylex and myio which corresponds to shlex and io from Python distribution, but instrumented to preserve taints.
xxxxxxxxxx%%var netrc_src### CGIDecode.pyThe CGIDecode is a program to decode a URL encoded string. The source for this program was obtained from [here](https://www.fuzzingbook.org/html/Coverage.html).xxxxxxxxxx%%var cgidecode_src### Subject RegistryWe store all our subject programs under `program_src`.We store all our subject programs under program_src.
xxxxxxxxxx# [(program_src = { 'calculator.py': VARS['calc_src'], 'mathexpr.py': VARS['mathexpr_src'], 'urlparse.py': VARS['urlparse_src'], 'netrc.py': VARS['netrc_src'], 'cgidecode.py': VARS['cgidecode_src'], 'microjson.py': VARS['microjson_src']}# )]## Rewriting the source to track control flow and taints.We rewrite the source so that `asring in value` gets converted to `taint_wrap__(astring).in_(value)`. Note that what we are tracking is not really taints, but rather _character accesses_ to the origin string.We rewrite the source so that asring in value gets converted to taint_wrap__(astring).in_(value). Note that what we are tracking is not really taints, but rather character accesses to the origin string.
We also rewrite the methods so that method bodies are enclosed in a `method__` context manager, any `if`conditions and `while` loops (only `while` for now) are enclosed in an outer `stack__` and inner `scope__` context manager. This lets us track when the corresponding scopes are entered into and left.We also rewrite the methods so that method bodies are enclosed in a method__ context manager, any ifconditions and while loops (only while for now) are enclosed in an outer stack__ and inner scope__ context manager. This lets us track when the corresponding scopes are entered into and left.
xxxxxxxxxximport astimport astor### InRewriterThe `InRewriter` class handles transforming `in` statements so that taints can be tracked. It has two methods. The `wrap()` method transforms any `a in lst` calls to `taint_wrap__(a) in lst`.The InRewriter class handles transforming in statements so that taints can be tracked. It has two methods. The wrap() method transforms any a in lst calls to taint_wrap__(a) in lst.
xxxxxxxxxxclass InRewriter(ast.NodeTransformer): def wrap(self, node): return ast.Call(func=ast.Name(id='taint_wrap__', ctx=ast.Load()), args=[node], keywords=[])The `wrap()` method is internally used by `visit_Compare()` method to transform `a in lst` to `taint_wrap__(a).in_(lst)`. We need to do this because Python ties the overriding of `in` operator to the `__contains__()` method in the class of `lst`. In our case, however, very often `a` is the element tainted and hence proxied. Hence we need a method invoked on the `a` object.The wrap() method is internally used by visit_Compare() method to transform a in lst to taint_wrap__(a).in_(lst). We need to do this because Python ties the overriding of in operator to the __contains__() method in the class of lst. In our case, however, very often a is the element tainted and hence proxied. Hence we need a method invoked on the a object.
xxxxxxxxxxclass InRewriter(InRewriter): def visit_Compare(self, tree_node): left = tree_node.left if not tree_node.ops or not isinstance(tree_node.ops[0], ast.In): return tree_node mod_val = ast.Call( func=ast.Attribute( value=self.wrap(left), attr='in_'), args=tree_node.comparators, keywords=[]) return mod_valTying it together.Tying it together.
xxxxxxxxxxdef rewrite_in(src): v = ast.fix_missing_locations(InRewriter().visit(ast.parse(src))) source = astor.to_source(v) return "%s" % sourcexxxxxxxxxxfrom fuzzingbook.fuzzingbook_utils import print_contentxxxxxxxxxx%top print_content(rewrite_in('s in ["a", "b", "c"]'))### RewriterThe `Rewriter` class handles inserting tracing probes into methods and control structures. Essentially, we insert a `with` scope for the method body, and a `with` scope outside both `while` and `if` scopes. Finally, we insert a `with` scope inside the `while` and `if` scopes. IMPORTANT: We only implement the `while` context. Similar should be implemented for the `for` context.The Rewriter class handles inserting tracing probes into methods and control structures. Essentially, we insert a with scope for the method body, and a with scope outside both while and if scopes. Finally, we insert a with scope inside the while and if scopes. IMPORTANT: We only implement the while context. Similar should be implemented for the for context.
A few counters to provide unique identifiers for context managers. Essentially, we number each if and while that we see.A few counters to provide unique identifiers for context managers. Essentially, we number each if and while that we see.
xxxxxxxxxxclass Rewriter(InRewriter): def init_counters(self): self.if_counter = 0 self.while_counter = 0The `methods[]` array is used to keep track of the current method stack during execution. Epsilon and NoEpsilon are simply constants that I use to indicate whether an IF or a Loop is nullable or not. If it is nullable, I mark it with Epsilon.The methods[] array is used to keep track of the current method stack during execution. Epsilon and NoEpsilon are simply constants that I use to indicate whether an IF or a Loop is nullable or not. If it is nullable, I mark it with Epsilon.
xxxxxxxxxxmethods = []Epsilon = '-'NoEpsilon = '='The `wrap_in_method()` generates a wrapper for method definitions.The wrap_in_method() generates a wrapper for method definitions.
xxxxxxxxxxclass Rewriter(Rewriter): def wrap_in_method(self, body, args): method_name_expr = ast.Str(methods[-1]) my_args = ast.List(args.args, ast.Load()) args = [method_name_expr, my_args] scope_expr = ast.Call(func=ast.Name(id='method__', ctx=ast.Load()), args=args, keywords=[]) return [ast.With(items=[ast.withitem(scope_expr, ast.Name(id='_method__'))], body=body)]The method `visit_FunctionDef()` is the method rewriter that actually does the job.The method visit_FunctionDef() is the method rewriter that actually does the job.
xxxxxxxxxxclass Rewriter(Rewriter): def visit_FunctionDef(self, tree_node): self.init_counters() methods.append(tree_node.name) self.generic_visit(tree_node) tree_node.body = self.wrap_in_method(tree_node.body, tree_node.args) return tree_nodeThe `rewrite_def()` method wraps the function definitions in scopes.The rewrite_def() method wraps the function definitions in scopes.
xxxxxxxxxxdef rewrite_def(src): v = ast.fix_missing_locations(Rewriter().visit(ast.parse(src))) return astor.to_source(v)We can use it as follows:We can use it as follows:
xxxxxxxxxx%top print_content(rewrite_def('\n'.join(program_src['calculator.py'].split('\n')[12:19])), 'calculator.py')#### The stack wrapperThe method `wrap_in_outer()` adds a `with ..stack..()` context _outside_ the control structures. The stack is used to keep track of the current control structure stack for any character comparison made. Notice the `can_empty` parameter. This indicates that the particular structure is _nullable_. For `if` we can make the condition right away. For `while` we postpone the decision.The method wrap_in_outer() adds a with ..stack..() context outside the control structures. The stack is used to keep track of the current control structure stack for any character comparison made. Notice the can_empty parameter. This indicates that the particular structure is nullable. For if we can make the condition right away. For while we postpone the decision.
xxxxxxxxxxclass Rewriter(Rewriter): def wrap_in_outer(self, name, can_empty, counter, node): name_expr = ast.Str(name) can_empty_expr = ast.Str(can_empty) counter_expr = ast.Num(counter) method_id = ast.Name(id='_method__') args = [name_expr, counter_expr, method_id, can_empty_expr] scope_expr = ast.Call(func=ast.Name(id='stack__', ctx=ast.Load()), args=args, keywords=[]) return ast.With( items=[ast.withitem(scope_expr, ast.Name(id='%s_%d_stack__' % (name, counter)))], body=[node])#### The scope wrapperThe method `wrap_in_inner()` adds a `with ...scope..()` context immediately inside the control structure. For `while`, this means simply adding one `with ...scope..()` just before the first line. For `if`, this means adding one `with ...scope...()` each to each branch of the `if` condition.The method wrap_in_inner() adds a with ...scope..() context immediately inside the control structure. For while, this means simply adding one with ...scope..() just before the first line. For if, this means adding one with ...scope...() each to each branch of the if condition.
xxxxxxxxxxclass Rewriter(Rewriter): def wrap_in_inner(self, name, counter, val, body): val_expr = ast.Num(val) stack_iter = ast.Name(id='%s_%d_stack__' % (name, counter)) method_id = ast.Name(id='_method__') args = [val_expr, stack_iter, method_id] scope_expr = ast.Call(func=ast.Name(id='scope__', ctx=ast.Load()), args=args, keywords=[]) return [ast.With( items=[ast.withitem(scope_expr, ast.Name(id='%s_%d_%d_scope__' % (name, counter, val)))], body=body)]#### Rewriting `If` conditionsWhile rewriting if conditions, we have to take care of the cascading if conditions (`elsif`), which is represented as nested if conditions in AST. They do not require separate `stack` context, but only separate `scope` contexts.If conditions¶While rewriting if conditions, we have to take care of the cascading if conditions (elsif), which is represented as nested if conditions in AST. They do not require separate stack context, but only separate scope contexts.
xxxxxxxxxxclass Rewriter(Rewriter): def process_if(self, tree_node, counter, val=None): if val is None: val = 0 else: val += 1 if_body = [] self.generic_visit(tree_node.test) for node in tree_node.body: self.generic_visit(node) tree_node.body = self.wrap_in_inner('if', counter, val, tree_node.body) # else part. if len(tree_node.orelse) == 1 and isinstance(tree_node.orelse[0], ast.If): self.process_if(tree_node.orelse[0], counter, val) else: if tree_node.orelse: val += 1 for node in tree_node.orelse: self.generic_visit(node) tree_node.orelse = self.wrap_in_inner('if', counter, val, tree_node.orelse)While rewriting `if` conditions, we have to take care of the cascading `if` conditions, which is represented as nested `if` conditions in AST. We need to identify whether the cascading `if` conditions (`elsif`) have an empty `orelse` clause or not. If it has an empty `orelse`, then the entire set of `if` conditions may be excised, and still produce a valid value. Hence, it should be marked as optional. The `visit_If()` checks if the cascading `ifs` have an `orelse` or not. While rewriting if conditions, we have to take care of the cascading if conditions, which is represented as nested if conditions in AST. We need to identify whether the cascading if conditions (elsif) have an empty orelse clause or not. If it has an empty orelse, then the entire set of if conditions may be excised, and still produce a valid value. Hence, it should be marked as optional. The visit_If() checks if the cascading ifs have an orelse or not.
xxxxxxxxxxclass Rewriter(Rewriter): def visit_If(self, tree_node): self.if_counter += 1 counter = self.if_counter #is it empty start = tree_node while start: if isinstance(start, ast.If): if not start.orelse: start = None elif len(start.orelse) == 1: start = start.orelse[0] else: break else: break self.process_if(tree_node, counter=self.if_counter) can_empty = NoEpsilon if start else Epsilon # NoEpsilon for + and Epsilon for * return self.wrap_in_outer('if', can_empty, counter, tree_node)#### Rewriting `while` loopsRewriting while loops are simple. We wrap them in `stack` and `scope` contexts. We do not implement the `orelse` feature yet.while loops¶Rewriting while loops are simple. We wrap them in stack and scope contexts. We do not implement the orelse feature yet.
xxxxxxxxxxclass Rewriter(Rewriter): def visit_While(self, tree_node): self.generic_visit(tree_node) self.while_counter += 1 counter = self.while_counter test = tree_node.test body = tree_node.body assert not tree_node.orelse tree_node.body = self.wrap_in_inner('while', counter, 0, body) return self.wrap_in_outer('while', '?', counter, tree_node)xxxxxxxxxxdef rewrite_cf(src, original): v = ast.fix_missing_locations(Rewriter().visit(ast.parse(src))) return astor.to_source(v)An example with `if` conditions.An example with if conditions.
xxxxxxxxxx%top print_content('\n'.join(program_src['calculator.py'].split('\n')[12:19]), 'calculator.py')xxxxxxxxxx%top print_content(rewrite_cf('\n'.join(program_src['calculator.py'].split('\n')[12:19]), 'calculator.py').strip(), filename='calculator.py')An example with `while` loops.An example with while loops.
xxxxxxxxxx%top print_content('\n'.join(program_src['calculator.py'].split('\n')[5:11]), 'calculator.py')xxxxxxxxxx%top print_content(rewrite_cf('\n'.join(program_src['calculator.py'].split('\n')[5:11]), 'calculator.py'), filename='calculator.py')#### Generating the complete instrumented sourceFor the complete instrumented source, we need to first make sure that all necessary imports are satisfied. Next, we also need to invoke the parser with the necessary tainted input and output the trace.For the complete instrumented source, we need to first make sure that all necessary imports are satisfied. Next, we also need to invoke the parser with the necessary tainted input and output the trace.
xxxxxxxxxxdef rewrite(src, original): src = ast.fix_missing_locations(InRewriter().visit(ast.parse(src))) v = ast.fix_missing_locations(Rewriter().visit(ast.parse(src))) header = """from mimid_context import scope__, stack__, method__import jsonimport sysimport taintsfrom taints import taint_wrap__ """ source = astor.to_source(v) footer = """if __name__ == "__main__": js = [] for arg in sys.argv[1:]: with open(arg) as f: mystring = f.read().strip().replace('\\n', ' ') taints.trace_init() tainted_input = taints.wrap_input(mystring) main(tainted_input) assert tainted_input.comparisons j = { 'comparisons_fmt': 'idx, char, method_call_id', 'comparisons':taints.convert_comparisons(tainted_input.comparisons, mystring), 'method_map_fmt': 'method_call_id, method_name, children', 'method_map': taints.convert_method_map(taints.METHOD_MAP), 'inputstr': mystring, 'original': %s, 'arg': arg} js.append(j) print(json.dumps(js))""" footer = footer % repr(original) return "%s\n%s\n%s" % (header, source, footer)xxxxxxxxxx%top calc_parse_rewritten = rewrite(program_src['calculator.py'], original='calculator.py')xxxxxxxxxx%top print_content(calc_parse_rewritten, filename='calculator.py')We will now write the transformed sources.We will now write the transformed sources.
xxxxxxxxxxdo(['mkdir','-p','build','subjects','samples']).returncodexxxxxxxxxx# [(for file_name in program_src: print(file_name) with open("subjects/%s" % file_name, 'wb+') as f: f.write(program_src[file_name].encode('utf-8')) with open("build/%s" % file_name, 'w+') as f: f.write(rewrite(program_src[file_name], file_name))# )]### Context MangersThe context managers are probes inserted into the source code so that we know when execution enters and exits specific control flow structures such as conditionals and loops. Note that source code for these probes are not really a requirement. They can be inserted directly on binaries too, or even dynamically inserted using tools such as `dtrace`. For now, we make our life simple using AST editing.The context managers are probes inserted into the source code so that we know when execution enters and exits specific control flow structures such as conditionals and loops. Note that source code for these probes are not really a requirement. They can be inserted directly on binaries too, or even dynamically inserted using tools such as dtrace. For now, we make our life simple using AST editing.
#### Method contextThe `method__` context handles the assignment of method name, as well as storing the method stack.The method__ context handles the assignment of method name, as well as storing the method stack.
xxxxxxxxxx%%var mimid_method_context# [(import taintsimport urllib.parsedef to_key(method, name, num): return '%s:%s_%s' % (method, name, num)class method__: def __init__(self, name, args): if not taints.METHOD_NUM_STACK: return self.args = '_'.join([urllib.parse.quote(i) for i in args if type(i) == str]) if not self.args: self.name = name else: self.name = "%s__%s" % (name, self.args) # <- not for now #TODO if args and hasattr(args[0], 'tag'): self.name = "%s:%s" % (args[0].tag, self.name) taints.trace_call(self.name) def __enter__(self): if not taints.METHOD_NUM_STACK: return taints.trace_set_method(self.name) self.stack = [] return self def __exit__(self, *args): if not taints.METHOD_NUM_STACK: return taints.trace_return() taints.trace_set_method(self.name)# )]The stack context stores the current prefix and handles updating the stack that is stored at the method context. The stack context stores the current prefix and handles updating the stack that is stored at the method context.
xxxxxxxxxx%%var mimid_stack_context# [(class stack__: def __init__(self, name, num, method_i, can_empty): if not taints.METHOD_NUM_STACK: return self.method_stack = method_i.stack self.can_empty = can_empty # * means yes. + means no, ? means to be determined self.name, self.num, self.method = name, num, method_i.name self.prefix = to_key(self.method, self.name, self.num) def __enter__(self): if not taints.METHOD_NUM_STACK: return if self.name in {'while'}: self.method_stack.append(0) elif self.name in {'if'}: self.method_stack.append(-1) else: assert False return self def __exit__(self, *args): if not taints.METHOD_NUM_STACK: return self.method_stack.pop()# )]#### Scope contextThe scope context correctly identifies when the control structure is entered into, and exited (in case of loops) and the alternative entered int (in case of if conditions).The scope context correctly identifies when the control structure is entered into, and exited (in case of loops) and the alternative entered int (in case of if conditions).
xxxxxxxxxx%%var mimid_scope_context# [(import jsonclass scope__: def __init__(self, alt, stack_i, method_i): if not taints.METHOD_NUM_STACK: return self.name, self.num, self.method, self.alt = stack_i.name, stack_i.num, stack_i.method, alt self.method_stack = method_i.stack self.can_empty = stack_i.can_empty def __enter__(self): if not taints.METHOD_NUM_STACK: return if self.name in {'while'}: self.method_stack[-1] += 1 elif self.name in {'if'}: pass else: assert False, self.name uid = json.dumps(self.method_stack) if self.name in {'while'}: taints.trace_call('%s:%s_%s %s %s' % (self.method, self.name, self.num, self.can_empty, uid)) else: taints.trace_call('%s:%s_%s %s %s#%s' % (self.method, self.name, self.num, self.can_empty, self.alt, uid)) taints.trace_set_method(self.name) return self def __exit__(self, *args): if not taints.METHOD_NUM_STACK: return taints.trace_return() taints.trace_set_method(self.name)# )]### Taint TrackerThe taint tracker is essentially a reimplementation of the information flow taints from the Fuzzingbook. It incorporates tracing of character accesses. IMPORTANT: Not all methods are implemented.The taint tracker is essentially a reimplementation of the information flow taints from the Fuzzingbook. It incorporates tracing of character accesses. IMPORTANT: Not all methods are implemented.
xxxxxxxxxx%%var taints_srcWe write both files to the appropriate locations.We write both files to the appropriate locations.
xxxxxxxxxx# [(with open('build/mimid_context.py', 'w+') as f: print(VARS['mimid_method_context'], file=f) print(VARS['mimid_stack_context'], file=f) print(VARS['mimid_scope_context'], file=f)with open('build/taints.py', 'w+') as f: print(VARS['taints_src'], file=f)# )]Here is how one can generate traces for the `calc` program.Here is how one can generate traces for the calc program.
xxxxxxxxxx%top do(['mkdir','-p','samples/calc']).returncodexxxxxxxxxx%top do(['mkdir','-p','samples/mathexpr']).returncodexxxxxxxxxx%%top# [(with open('samples/calc/0.csv', 'w+') as f: print('9-(16+72)*3/458', file=f) with open('samples/calc/1.csv', 'w+') as f: print('(9)+3/4/58', file=f) with open('samples/calc/2.csv', 'w+') as f: print('8*3/40', file=f)# )]Generating traces on `mathexpr`.Generating traces on mathexpr.
xxxxxxxxxx%%top# [(with open('samples/mathexpr/0.csv', 'w+') as f: print('100', file=f) with open('samples/mathexpr/1.csv', 'w+') as f: print('2 + 3', file=f) with open('samples/mathexpr/2.csv', 'w+') as f: print('4 * 5', file=f)# )]xxxxxxxxxx%top calc_trace_out = do("python build/calculator.py samples/calc/*.csv", shell=True).stdoutxxxxxxxxxx%top mathexpr_trace_out = do("python build/mathexpr.py samples/mathexpr/*.csv", shell=True).stdoutxxxxxxxxxximport jsonxxxxxxxxxx%top calc_trace = json.loads(calc_trace_out)xxxxxxxxxx%top mathexpr_trace = json.loads(mathexpr_trace_out)### Reconstructing the Method Tree with Attached Character ComparisonsReconstruct the actual method trace from a trace with the followingformat```key : [ mid, method_name, children_ids ]```Reconstruct the actual method trace from a trace with the following format
key : [ mid, method_name, children_ids ]xxxxxxxxxxdef reconstruct_method_tree(method_map): first_id = None tree_map = {} for key in method_map: m_id, m_name, m_children = method_map[key] children = [] if m_id in tree_map: # just update the name and children assert not tree_map[m_id] tree_map[m_id]['id'] = m_id tree_map[m_id]['name'] = m_name tree_map[m_id]['indexes'] = [] tree_map[m_id]['children'] = children else: assert first_id is None tree_map[m_id] = {'id': m_id, 'name': m_name, 'children': children, 'indexes': []} first_id = m_id for c in m_children: assert c not in tree_map val = {} tree_map[c] = val children.append(val) return first_id, tree_mapHere is how one would use it. The first element in the returned tuple is the id of the bottom most method call.Here is how one would use it. The first element in the returned tuple is the id of the bottom most method call.
xxxxxxxxxxfrom fuzzingbook.GrammarFuzzer import display_treexxxxxxxxxx%top first, calc_method_tree1 = reconstruct_method_tree(calc_trace[0]['method_map'])xxxxxxxxxx%top first, mathexpr_method_tree1 = reconstruct_method_tree(mathexpr_trace[0]['method_map'])xxxxxxxxxxdef extract_node(node, id): symbol = str(node['id']) children = node['children'] annotation = str(node['name']) return "%s:%s" % (symbol, annotation), children, ''xxxxxxxxxx%top v = display_tree(calc_method_tree1[0], extract_node=extract_node)xxxxxxxxxxfrom IPython.display import Imagexxxxxxxxxxdef zoom(v, zoom=True): # return v directly if you do not want to zoom out. if zoom: return Image(v.render(format='png')) return vxxxxxxxxxx%top zoom(v)xxxxxxxxxx%top zoom(display_tree(mathexpr_method_tree1[0], extract_node=extract_node))#### Identifying last comparisonsWe need only the last comparisons made on any index. This means that we should care for only the last parse in an ambiguous parse. However, to make concessions for real world, we also check if we are overwriting a child (`HEURISTIC`). Note that `URLParser` is the only parser that needs this heuristic.We need only the last comparisons made on any index. This means that we should care for only the last parse in an ambiguous parse. However, to make concessions for real world, we also check if we are overwriting a child (HEURISTIC). Note that URLParser is the only parser that needs this heuristic.
xxxxxxxxxxdef last_comparisons(comparisons): HEURISTIC = True last_cmp_only = {} last_idx = {} # get the last indexes compared in methods. for idx, char, mid in comparisons: if mid in last_idx: if idx > last_idx[mid]: last_idx[mid] = idx else: last_idx[mid] = idx for idx, char, mid in comparisons: if HEURISTIC: if idx in last_cmp_only: if last_cmp_only[idx] > mid: # do not clobber children unless it was the last character # for that child. if last_idx[mid] > idx: # if it was the last index, may be the child used it # as a boundary check. continue last_cmp_only[idx] = mid return last_cmp_onlyHere is how one would use it.Here is how one would use it.
xxxxxxxxxx%top calc_last_comparisons1 = last_comparisons(calc_trace[0]['comparisons'])xxxxxxxxxx%top calc_last_comparisons1xxxxxxxxxx%top mathexpr_last_comparisons1 = last_comparisons(mathexpr_trace[0]['comparisons'])xxxxxxxxxx%top mathexpr_last_comparisons1#### Attaching characters to the treeAdd the comparison indexes to the method tree that we constructedAdd the comparison indexes to the method tree that we constructed
xxxxxxxxxxdef attach_comparisons(method_tree, comparisons): for idx in comparisons: mid = comparisons[idx] method_tree[mid]['indexes'].append(idx)Here is how one would use it. Note which method call each input index is associated. For example, the first index is associated with method call id: 6, which corresponds to `is_digit`.Here is how one would use it. Note which method call each input index is associated. For example, the first index is associated with method call id: 6, which corresponds to is_digit.
xxxxxxxxxx%top attach_comparisons(calc_method_tree1, calc_last_comparisons1)xxxxxxxxxx%top calc_method_tree1xxxxxxxxxx%top attach_comparisons(mathexpr_method_tree1, mathexpr_last_comparisons1)xxxxxxxxxx%top mathexpr_method_tree1xxxxxxxxxxdef wrap_input(istr): def extract_node(node, id): symbol = str(node['id']) children = node['children'] annotation = str(node['name']) indexes = repr(tuple([istr[i] for i in node['indexes']])) return "%s %s" % (annotation, indexes), children, '' return extract_nodexxxxxxxxxx%top extract_node1 = wrap_input(calc_trace[0]['inputstr'])xxxxxxxxxx%top zoom(display_tree(calc_method_tree1[0], extract_node=extract_node1))xxxxxxxxxx%top extract_node1 = wrap_input(mathexpr_trace[0]['inputstr'])xxxxxxxxxx%top zoom(display_tree(mathexpr_method_tree1[0], extract_node=extract_node1))We define `to_node()` a convenience function that, given a list of _contiguous_ indexes and original string, translates it to a leaf node of a tree (that corresponds to the derivation tree syntax in the Fuzzingbook) with a string, empty children, and starting node and ending node.We define to_node() a convenience function that, given a list of contiguous indexes and original string, translates it to a leaf node of a tree (that corresponds to the derivation tree syntax in the Fuzzingbook) with a string, empty children, and starting node and ending node.
Convert a list of indexes to a corresponding terminal tree nodeConvert a list of indexes to a corresponding terminal tree node
xxxxxxxxxxdef to_node(idxes, my_str): assert len(idxes) == idxes[-1] - idxes[0] + 1 assert min(idxes) == idxes[0] assert max(idxes) == idxes[-1] return my_str[idxes[0]:idxes[-1] + 1], [], idxes[0], idxes[-1]Here is how one would use it.Here is how one would use it.
xxxxxxxxxx%top to_node(calc_method_tree1[6]['indexes'], calc_trace[0]['inputstr'])xxxxxxxxxxfrom operator import itemgetterimport itertools as itWe now need to identify the terminal (leaf) nodes. For that, we want to group contiguous letters in a node together, and call it a leaf node. So, convert our list of indexes to lists of contiguous indexes first, then convert them to terminal tree nodes. Then, return a set of one level child nodes with contiguous chars from indexes.We now need to identify the terminal (leaf) nodes. For that, we want to group contiguous letters in a node together, and call it a leaf node. So, convert our list of indexes to lists of contiguous indexes first, then convert them to terminal tree nodes. Then, return a set of one level child nodes with contiguous chars from indexes.
xxxxxxxxxxdef indexes_to_children(indexes, my_str): lst = [ list(map(itemgetter(1), g)) for k, g in it.groupby(enumerate(indexes), lambda x: x[0] - x[1]) ] return [to_node(n, my_str) for n in lst]xxxxxxxxxx%top indexes_to_children(calc_method_tree1[6]['indexes'], calc_trace[0]['inputstr'])Finally, we need to remove the overlap from the trees we have so far. The idea is that, given a node, each child node of that node should be uniquely responsible for a specified range of characters, with no overlap allowed between the children. The starting of the first child to ending of the last child will be the range of the node.Finally, we need to remove the overlap from the trees we have so far. The idea is that, given a node, each child node of that node should be uniquely responsible for a specified range of characters, with no overlap allowed between the children. The starting of the first child to ending of the last child will be the range of the node.
#### Removing OverlapIf overlap is found, the tie is biased to the later child. That is, the later child gets to keep the range, and the former child is recursively traversed to remove overlaps from its children. If a child is completely included in the overlap, the child is excised. A few convenience functions first:If overlap is found, the tie is biased to the later child. That is, the later child gets to keep the range, and the former child is recursively traversed to remove overlaps from its children. If a child is completely included in the overlap, the child is excised. A few convenience functions first:
xxxxxxxxxxdef does_item_overlap(s, e, s_, e_): return (s_ >= s and s_ <= e) or (e_ >= s and e_ <= e) or (s_ <= s and e_ >= e)xxxxxxxxxxdef is_second_item_included(s, e, s_, e_): return (s_ >= s and e_ <= e)xxxxxxxxxxdef has_overlap(ranges, s_, e_): return {(s, e) for (s, e) in ranges if does_item_overlap(s, e, s_, e_)}xxxxxxxxxxdef is_included(ranges, s_, e_): return {(s, e) for (s, e) in ranges if is_second_item_included(s, e, s_, e_)}xxxxxxxxxxdef remove_overlap_from(original_node, orange): node, children, start, end = original_node new_children = [] if not children: return None start = -1 end = -1 for child in children: if does_item_overlap(*child[2:4], *orange): new_child = remove_overlap_from(child, orange) if new_child: # and new_child[1]: if start == -1: start = new_child[2] new_children.append(new_child) end = new_child[3] else: new_children.append(child) if start == -1: start = child[2] end = child[3] if not new_children: return None assert start != -1 assert end != -1 return (node, new_children, start, end)Verify that there is no overlap.Verify that there is no overlap.
xxxxxxxxxxdef no_overlap(arr): my_ranges = {} for a in arr: _, _, s, e = a included = is_included(my_ranges, s, e) if included: continue # we will fill up the blanks later. else: overlaps = has_overlap(my_ranges, s, e) if overlaps: # unlike include which can happen only once in a set of # non-overlapping ranges, overlaps can happen on multiple parts. # The rule is, the later child gets the say. So, we recursively # remove any ranges that overlap with the current one from the # overlapped range. assert len(overlaps) == 1 oitem = list(overlaps)[0] v = remove_overlap_from(my_ranges[oitem], (s,e)) del my_ranges[oitem] if v: my_ranges[v[2:4]] = v my_ranges[(s, e)] = a else: my_ranges[(s, e)] = a res = my_ranges.values() # assert no overlap, and order by starting index s = sorted(res, key=lambda x: x[2]) return s#### Generate derivation treeConvert a mapped tree to the _fuzzingbook_ style derivation tree.Convert a mapped tree to the fuzzingbook style derivation tree.
xxxxxxxxxxdef to_tree(node, my_str): method_name = ("<%s>" % node['name']) if node['name'] is not None else '<START>' indexes = node['indexes'] node_children = [to_tree(c, my_str) for c in node.get('children', [])] idx_children = indexes_to_children(indexes, my_str) children = no_overlap([c for c in node_children if c is not None] + idx_children) if not children: return None start_idx = children[0][2] end_idx = children[-1][3] si = start_idx my_children = [] # FILL IN chars that we did not compare. This is likely due to an i + n # instruction. for c in children: if c[2] != si: sbs = my_str[si: c[2]] my_children.append((sbs, [], si, c[2] - 1)) my_children.append(c) si = c[3] + 1 m = (method_name, my_children, start_idx, end_idx) return mxxxxxxxxxx%top zoom(display_tree(to_tree(calc_method_tree1[0], calc_trace[0]['inputstr'])))xxxxxxxxxx%top zoom(display_tree(to_tree(mathexpr_method_tree1[0], mathexpr_trace[0]['inputstr'])))### The Complete MinerWe now put everything together. The `miner()` takes the traces, produces trees out of them, and verifies that the trees actually correspond to the input.We now put everything together. The miner() takes the traces, produces trees out of them, and verifies that the trees actually correspond to the input.
xxxxxxxxxxfrom fuzzingbook.GrammarFuzzer import tree_to_stringxxxxxxxxxxdef miner(call_traces): my_trees = [] for call_trace in call_traces: method_map = call_trace['method_map'] first, method_tree = reconstruct_method_tree(method_map) comparisons = call_trace['comparisons'] attach_comparisons(method_tree, last_comparisons(comparisons)) my_str = call_trace['inputstr'] #print("INPUT:", my_str, file=sys.stderr) tree = to_tree(method_tree[first], my_str) #print("RECONSTRUCTED INPUT:", tree_to_string(tree), file=sys.stderr) my_tree = {'tree': tree, 'original': call_trace['original'], 'arg': call_trace['arg']} assert tree_to_string(tree) == my_str my_trees.append(my_tree) return my_treesUsing the `miner()`Using the miner()
xxxxxxxxxx%top mined_calc_trees = miner(calc_trace)%top calc_tree = mined_calc_trees[0]%top zoom(display_tree(calc_tree['tree']))xxxxxxxxxx%top mined_mathexpr_trees = miner(mathexpr_trace)%top mathexpr_tree = mined_mathexpr_trees[1]%top zoom(display_tree(mathexpr_tree['tree']))One of the problems that you can notice in the tree generated is that each `while` iterations get a different identifier. e.g. ``` ('<parse_expr:while_1 ? [2]>', [('+', [], 5, 5)], 5, 5), ('<parse_expr:while_1 ? [3]>', [('<parse_expr:if_1 + 0#[3, -1]>', [('<parse_num>', [('<is_digit>', [('7', [], 6, 6)], 6, 6), ('<is_digit>', [('2', [], 7, 7)], 7, 7)], 6, 7)], 6, 7)],```The separate identifiers are intentional because we do not yet know the actual dependencies between different iterations such as closing quotes, or closing braces or parenthesis. However, this creates a problem when we mine grammar because we need to match up the compatible nodes.Generalizer does it through actively doing surgery on the tree to see whether a node is replaceable with another.One of the problems that you can notice in the tree generated is that each while iterations get a different identifier. e.g.
('<parse_expr:while_1 ? [2]>', [('+', [], 5, 5)], 5, 5),
('<parse_expr:while_1 ? [3]>',
[('<parse_expr:if_1 + 0#[3, -1]>',
[('<parse_num>',
[('<is_digit>', [('7', [], 6, 6)], 6, 6),
('<is_digit>', [('2', [], 7, 7)], 7, 7)],
6,
7)],
6,
7)],
The separate identifiers are intentional because we do not yet know the actual dependencies between different iterations such as closing quotes, or closing braces or parenthesis. However, this creates a problem when we mine grammar because we need to match up the compatible nodes.
Generalizer does it through actively doing surgery on the tree to see whether a node is replaceable with another.
xxxxxxxxxximport copyimport random### Checking compatibility of nodesWe first need a few helper functions. The `replace_nodes()` function try to replace the _contents_ of the first node with the _contents_ of the second (That is, the tree that has these nodes will automatically be modified), collect the produced string from the tree, and reset any changes. The arguments are tuples with the following format: (node, file_name, tree)We first need a few helper functions. The replace_nodes() function try to replace the contents of the first node with the contents of the second (That is, the tree that has these nodes will automatically be modified), collect the produced string from the tree, and reset any changes. The arguments are tuples with the following format: (node, file_name, tree)
xxxxxxxxxxdef replace_nodes(a2, a1): node2, _, t2 = a2 node1, _, t1 = a1 str2_old = tree_to_string(t2) old = copy.copy(node2) node2.clear() for n in node1: node2.append(n) str2_new = tree_to_string(t2) assert str2_old != str2_new node2.clear() for n in old: node2.append(n) str2_last = tree_to_string(t2) assert str2_old == str2_last return str2_newCan a given node be replaced with another? The idea is, given two nodes (possibly from two trees), can the first node be replaced by the second, and still result in a valid string?Can a given node be replaced with another? The idea is, given two nodes (possibly from two trees), can the first node be replaced by the second, and still result in a valid string?
xxxxxxxxxxdef is_compatible(a1, a2, module): if tree_to_string(a1[0]) == tree_to_string(a2[0]): return True my_string = replace_nodes(a1, a2) return check(my_string, module)xxxxxxxxxx%%var check_src# [(import sys, impparse_ = imp.new_module('parse_')def init_module(src): with open(src) as sf: exec(sf.read(), parse_.__dict__)def _check(s): try: parse_.main(s) return True except: return Falseimport sysdef main(args): init_module(args[0]) if _check(args[1]): sys.exit(0) else: sys.exit(1)import sysmain(sys.argv[1:])# )]xxxxxxxxxx# [(with open('build/check.py', 'w+') as f: print(VARS['check_src'], file=f)# )]xxxxxxxxxxEXEC_MAP = {}NODE_REGISTER = {}TREE = NoneFILE = Nonexxxxxxxxxxdef reset_generalizer(): global NODE_REGISTER, TREE, FILE, EXEC_MAP NODE_REGISTER={} TREE = None FILE = None EXEC_MAP = {}xxxxxxxxxxreset_generalizer()xxxxxxxxxximport os.pathxxxxxxxxxxdef check(s, module): if s in EXEC_MAP: return EXEC_MAP[s] result = do(["python", "./build/check.py", "subjects/%s" % module, s], shell=False) with open('build/%s.log' % module, 'a+') as f: print(s, file=f) print(' '.join(["python", "./build/check.py", "subjects/%s" % module, s]), file=f) print(":=", result.returncode, file=f) print("\n", file=f) v = (result.returncode == 0) EXEC_MAP[s] = v return vxxxxxxxxxxdef to_modifiable(derivation_tree): node, children, *rest = derivation_tree return [node, [to_modifiable(c) for c in children], *rest]xxxxxxxxxx%top calc_tree_ = to_modifiable(calc_tree['tree'])%top while_loops = calc_tree_[1][0][1][0][1]xxxxxxxxxx%top while_loops[0]xxxxxxxxxx%top while_loops[1]xxxxxxxxxx%top assert not is_compatible((while_loops[1], 'c.py', calc_tree_), (while_loops[0], 'c.py', calc_tree_), 'calculator.py')xxxxxxxxxx%top assert is_compatible((while_loops[0], 'c.py', calc_tree_), (while_loops[2], 'c.py', calc_tree_), 'calculator.py')We need to extract meta information from the names, and update it back. TODO: make the meta info JSON.We need to extract meta information from the names, and update it back. TODO: make the meta info JSON.
xxxxxxxxxxdef parse_name(name): assert name[0] + name[-1] == '<>' name = name[1:-1] method, rest = name.split(':') ctrl_name, space, rest = rest.partition(' ') can_empty, space, stack = rest.partition(' ') ctrl, cname = ctrl_name.split('_') if ':while_' in name: method_stack = json.loads(stack) return method, ctrl, int(cname), 0, can_empty, method_stack elif ':if_' in name: num, mstack = stack.split('#') method_stack = json.loads(mstack) return method, ctrl, int(cname), num, can_empty, method_stackxxxxxxxxxx%top [parse_name(w[0]) for w in while_loops]xxxxxxxxxxdef unparse_name(method, ctrl, name, num, can_empty, cstack): if ctrl == 'while': return "<%s:%s_%s %s %s>" % (method, ctrl, name, can_empty, json.dumps(cstack)) else: return "<%s:%s_%s %s %s#%s>" % (method, ctrl, name, can_empty, num, json.dumps(cstack))Verify that parsing and unparsing works.Verify that parsing and unparsing works.
xxxxxxxxxx%top assert all(unparse_name(*parse_name(w[0])) == w[0] for w in while_loops)### Propagate rename of the `while` node up the child nodes.The `update_stack()` when given a node, and a new name, recursively updates the method stack in the children.while node up the child nodes.¶The update_stack() when given a node, and a new name, recursively updates the method stack in the children.
xxxxxxxxxxdef update_stack(node, at, new_name): nname, children, *rest = node if not (':if_' in nname or ':while_' in nname): return method, ctrl, cname, num, can_empty, cstack = parse_name(nname) cstack[at] = new_name name = unparse_name(method, ctrl, cname, num, can_empty, cstack) #assert '?' not in name node[0] = name for c in children: update_stack(c, at, new_name)Update the node name once we have identified that it corresponds to a global name.Update the node name once we have identified that it corresponds to a global name.
xxxxxxxxxxdef update_name(k_m, my_id, seen): # fixup k_m with what is in my_id, and update seen. original = k_m[0] method, ctrl, cname, num, can_empty, cstack = parse_name(original) #assert can_empty != '?' cstack[-1] = float('%d.0' % my_id) name = unparse_name(method, ctrl, cname, num, can_empty, cstack) seen[k_m[0]] = name k_m[0] = name # only replace it at the len(cstack) -1 the # until the first non-cf token children = [] for c in k_m[1]: update_stack(c, len(cstack)-1, cstack[-1]) return name, k_mNote that the rename happens only within the current method stack. That is, it does not propagate across method calls. Here is how one would use it.Note that the rename happens only within the current method stack. That is, it does not propagate across method calls. Here is how one would use it.
xxxxxxxxxx%top while_loops[2]We update the iteration number `3` with a global id `4.0`We update the iteration number 3 with a global id 4.0
xxxxxxxxxx%top name, node = update_name(while_loops[2], 4, {})%top node##### replace a set of nodesWe want to replace the `while` loop iteration identifiers with a global identifier. For that, we are given a list of nodes that are compatible with global ones. We first extract the iteration id from the global node, and apply it on the `while` node under consideration.We want to replace the while loop iteration identifiers with a global identifier. For that, we are given a list of nodes that are compatible with global ones. We first extract the iteration id from the global node, and apply it on the while node under consideration.
xxxxxxxxxxdef replace_stack_and_mark_star(to_replace): # remember, we only replace whiles. for (i, j) in to_replace: method1, ctrl1, cname1, num1, can_empty1, cstack1 = parse_name(i[0]) method2, ctrl2, cname2, num2, can_empty2, cstack2 = parse_name(j[0]) assert method1 == method2 assert ctrl1 == ctrl2 assert cname1 == cname2 #assert can_empty2 != '?' # fixup the can_empty new_name = unparse_name(method1, ctrl1, cname1, num1, can_empty2, cstack1) i[0] = new_name assert len(cstack1) == len(cstack2) update_stack(i, len(cstack2)-1, cstack2[-1]) to_replace.clear()### Generalize a given set of loopsThe main workhorse. It generalizes the looping constructs. It is given a set of while loops with the same label under the current node. TODO: Refactor when we actually have time.The main workhorse. It generalizes the looping constructs. It is given a set of while loops with the same label under the current node. TODO: Refactor when we actually have time.
##### Helper: node inclusionChecking for node inclusion. We do not want to try including a first node in second if the first node already contains the second. It will lead to infinite loop on `tree_to_string()`.Checking for node inclusion. We do not want to try including a first node in second if the first node already contains the second. It will lead to infinite loop on tree_to_string().
xxxxxxxxxxdef node_include(i, j): name_i, children_i, s_i, e_i = i name_j, children_j, s_j, e_j = j return s_i <= s_j and e_i >= e_j##### Helper: sortingOrdering nodes by their highest complexity to avoid spurious can-replace answers.Ordering nodes by their highest complexity to avoid spurious can-replace answers.
xxxxxxxxxxdef num_tokens(v, s): name, child, *rest = v s.add(name) [num_tokens(i, s) for i in child] return len(s)def s_fn(v): return num_tokens(v[0], set())xxxxxxxxxxMAX_SAMPLES = 1 # with reasonably complex inputs, this is sufficent if we do the surgery both ways.First, we check whether any of the loops we have are compatible with the globally registered loops in `while_register`.First, we check whether any of the loops we have are compatible with the globally registered loops in while_register.
xxxxxxxxxxdef check_registered_loops_for_compatibility(idx_map, while_register, module): seen = {} to_replace = [] idx_keys = sorted(idx_map.keys()) for while_key, f in while_register[0]: # try sampling here. my_values = while_register[0][(while_key, f)] v_ = random.choice(my_values) for k in idx_keys: k_m = idx_map[k] if k_m[0] in seen: continue if len(my_values) > MAX_SAMPLES: lst = [v for v in my_values if not node_include(v[0], k_m)] values = sorted(lst, key=s_fn, reverse=True)[0:MAX_SAMPLES] else: values = my_values # all values in v should be tried. replace = 0 for v in values: assert v[0][0] == v_[0][0] if f != FILE or not node_include(v[0], k_m): # if not k_m includes v a = is_compatible((k_m, FILE, TREE), v, module) if not a: replace = 0 break else: replace += 1 if f != FILE or not node_include(k_m, v[0]): b = is_compatible(v, (k_m, FILE, TREE), module) if not b: replace = 0 break else: replace += 1 # at least one needs to vouch, and all capable needs to agree. if replace: to_replace.append((k_m, v_[0])) # <- replace k_m by v seen[k_m[0]] = True replace_stack_and_mark_star(to_replace)Next, for all the loops that remain, check if they can be deleted. If they can be, we want to place `Epsilon == *` in place of `?` in the `can_empty` position.Next, for all the loops that remain, check if they can be deleted. If they can be, we want to place Epsilon == * in place of ? in the can_empty position.
xxxxxxxxxxdef can_the_loop_be_deleted(idx_map, while_register, module): idx_keys = sorted(idx_map.keys()) for i in idx_keys: i_m = idx_map[i] if '.0' in i_m[0]: # assert '?' not in i_m[0] continue a = is_compatible((i_m, FILE, TREE), (['', [], 0, 0], FILE, TREE), module) method1, ctrl1, cname1, num1, can_empty, cstack1 = parse_name(i_m[0]) name = unparse_name(method1, ctrl1, cname1, num1, Epsilon if a else NoEpsilon, cstack1) i_m[0] = nameNext, we check all current loops whether they are compatible with each other. Essentially, we start from the back, and check if the first or second or third ... nodes are compatible with the last node. Then take the second last node and do the same.If they are, we use the same name for all compatible nodes.Next, we check all current loops whether they are compatible with each other. Essentially, we start from the back, and check if the first or second or third ... nodes are compatible with the last node. Then take the second last node and do the same.
If they are, we use the same name for all compatible nodes.
xxxxxxxxxxdef check_current_loops_for_compatibility(idx_map, while_register, module): to_replace = [] rkeys = sorted(idx_map.keys(), reverse=True) for i in rkeys: # <- nodes to check for replacement -- started from the back i_m = idx_map[i] # assert '?' not in i_m[0] if '.0' in i_m[0]: continue j_keys = sorted([j for j in idx_map.keys() if j < i]) for j in j_keys: # <- nodes that we can replace i_m with -- starting from front. j_m = idx_map[j] # assert '?' not in j_m[0] if i_m[0] == j_m[0]: break # previous whiles worked. replace = False if not node_include(j_m, i_m): a = is_compatible((i_m, FILE, TREE), (j_m, FILE, TREE), module) if not a: continue replace = True if not node_include(i_m, j_m): b = is_compatible((j_m, FILE, TREE), (i_m, FILE, TREE), module) if not b: continue replace = True if replace: to_replace.append((i_m, j_m)) # <- replace i_m by j_m break replace_stack_and_mark_star(to_replace)Finally, register all the new while loops discovered.Finally, register all the new while loops discovered.
xxxxxxxxxxdef register_new_loops(idx_map, while_register): idx_keys = sorted(idx_map.keys()) seen = {} for k in idx_keys: k_m = idx_map[k] if ".0" not in k_m[0]: if k_m[0] in seen: k_m[0] = seen[k_m[0]] # and update method1, ctrl1, cname1, num1, can_empty1, cstack1 = parse_name(k_m[0]) update_name(k_m, cstack1[-1], seen) continue # new! get a brand new name! while_register[1] += 1 my_id = while_register[1] original_name = k_m[0] #assert '?' not in original_name name, new_km = update_name(k_m, my_id, seen) #assert '?' not in name while_register[0][(name, FILE)] = [(new_km, FILE, TREE)] else: name = k_m[0] if (name, FILE) not in while_register[0]: while_register[0][(name, FILE)] = [] while_register[0][(name, FILE)].append((k_m, FILE, TREE))All together.All together.
xxxxxxxxxxdef generalize_loop(idx_map, while_register, module): # First we check the previous while loops check_registered_loops_for_compatibility(idx_map, while_register, module) # Check whether any of these can be deleted. can_the_loop_be_deleted(idx_map, while_register, module) # then we check he current while iterations check_current_loops_for_compatibility(idx_map, while_register, module) # lastly, update all while names. register_new_loops(idx_map, while_register)We keep a global registry of nodes, so that we can use the same iteration labels.We keep a global registry of nodes, so that we can use the same iteration labels.
xxxxxxxxxx# NODE_REGISTER = {}### Collect loops to generalizeThe idea is to look through the tree, looking for while loops.When one sees a while loop, start at one end, and see if thewhile iteration index can be replaced by the first one, and viceversa. If not, try with the second one and so on until the first onesucceeds. When one succeeds, replace the definition of the matchingone with an alternate with the last's definition, and replace thename of last with the first, and delete last. Here, we only collect the while loops with same labels, with `generalize_loop()` doing the rest.The idea is to look through the tree, looking for while loops.
When one sees a while loop, start at one end, and see if the
while iteration index can be replaced by the first one, and vice
versa. If not, try with the second one and so on until the first one
succeeds. When one succeeds, replace the definition of the matching
one with an alternate with the last's definition, and replace the
name of last with the first, and delete last. Here, we only collect the while loops with same labels, with generalize_loop() doing the rest.
xxxxxxxxxxdef generalize(tree, module): node, children, *_rest = tree if node not in NODE_REGISTER: NODE_REGISTER[node] = {} register = NODE_REGISTER[node] for child in children: generalize(child, module) idxs = {} last_while = None for i,child in enumerate(children): # now we need to map the while_name here to the ones in node # register. Essentially, we try to replace each. if ':while_' not in child[0]: continue while_name = child[0].split(' ')[0] if last_while is None: last_while = while_name if while_name not in register: register[while_name] = [{}, 0] else: if last_while != while_name: # a new while! Generalize the last last_while = while_name generalize_loop(idxs, register[last_while]) idxs[i] = child if last_while is not None: generalize_loop(idxs, register[last_while], module)We need the ability for fairly deep surgery. So we dump and load the mined trees to convert tuples to arrays.We need the ability for fairly deep surgery. So we dump and load the mined trees to convert tuples to arrays.
xxxxxxxxxxdef generalize_iter(jtrees, log=False): global TREE, FILE new_trees = [] for j in jtrees: FILE = j['arg'] if log: print(FILE, file=sys.stderr) sys.stderr.flush() TREE = to_modifiable(j['tree']) generalize(TREE, j['original']) j['tree'] = TREE new_trees.append(copy.deepcopy(j)) return new_treesxxxxxxxxxxfrom fuzzingbook.GrammarFuzzer import extract_node as extract_node_oxxxxxxxxxx%top reset_generalizer()%top generalized_calc_trees = generalize_iter(mined_calc_trees)%top zoom(display_tree(generalized_calc_trees[0]['tree'], extract_node=extract_node_o))xxxxxxxxxx%top reset_generalizer()%top generalized_mathexpr_trees = generalize_iter(mined_mathexpr_trees)%top zoom(display_tree(generalized_mathexpr_trees[1]['tree'], extract_node=extract_node_o))## Generating a GrammarGenerating a grammar from the generalized derivation trees is pretty simple. Start at the start node, and any node that represents a method or a pseudo method becomes a nonterminal. The children forms alternate expansions for the nonterminal. Since all the keys are compatible, merging the grammar is simply merging the hash map.Generating a grammar from the generalized derivation trees is pretty simple. Start at the start node, and any node that represents a method or a pseudo method becomes a nonterminal. The children forms alternate expansions for the nonterminal. Since all the keys are compatible, merging the grammar is simply merging the hash map.
First, we define a pretty printer for grammar.First, we define a pretty printer for grammar.
xxxxxxxxxximport reRE_NONTERMINAL = re.compile(r'(<[^<> ]*>)')xxxxxxxxxxdef recurse_grammar(grammar, key, order, canonical): rules = sorted(grammar[key]) old_len = len(order) for rule in rules: if not canonical: res = re.findall(RE_NONTERMINAL, rule) else: res = rule for token in res: if token.startswith('<') and token.endswith('>'): if token not in order: order.append(token) new = order[old_len:] for ckey in new: recurse_grammar(grammar, ckey, order, canonical)xxxxxxxxxxdef show_grammar(grammar, start_symbol='<START>', canonical=True): order = [start_symbol] recurse_grammar(grammar, start_symbol, order, canonical) assert len(order) == len(grammar.keys()) return {k: sorted(grammar[k]) for k in order}xxxxxxxxxxdef to_grammar(tree, grammar): node, children, _, _ = tree tokens = [] if node not in grammar: grammar[node] = list() for c in children: if c[1] == []: tokens.append(c[0]) else: tokens.append(c[0]) to_grammar(c, grammar) grammar[node].append(tuple(tokens)) return grammarxxxxxxxxxxdef merge_grammar(g1, g2): all_keys = set(list(g1.keys()) + list(g2.keys())) merged = {} for k in all_keys: alts = set(g1.get(k, []) + g2.get(k, [])) merged[k] = alts return {k:[l for l in merged[k]] for k in merged}xxxxxxxxxxdef convert_to_grammar(my_trees): grammar = {} for my_tree in my_trees: tree = my_tree['tree'] src = my_tree['original'] g = to_grammar(tree, grammar) grammar = merge_grammar(grammar, g) return grammarxxxxxxxxxx%top calc_grammar = convert_to_grammar(generalized_calc_trees)%top show_grammar(calc_grammar)xxxxxxxxxx%top mathexpr_grammar = convert_to_grammar(generalized_mathexpr_trees)%top show_grammar(mathexpr_grammar)The grammar generated may still contain meta characters such as `<` and `>`. We need to cleanup these to make it a grammar that is fuzzable using the Fuzzingbook fuzzers.The grammar generated may still contain meta characters such as < and >. We need to cleanup these to make it a grammar that is fuzzable using the Fuzzingbook fuzzers.
xxxxxxxxxxdef to_fuzzable_grammar(grammar): def escape(t): if ((t[0]+t[-1]) == '<>'): return t.replace(' ', '_') else: return t new_g = {} for k in grammar: new_alt = [] for rule in grammar[k]: new_alt.append(''.join([escape(t) for t in rule])) new_g[k.replace(' ', '_')] = new_alt return new_gxxxxxxxxxxfrom fuzzingbook import GrammarFuzzerxxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(calc_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(mathexpr_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]### Inserting Empty Alternatives for IF and LoopsNext, we want to insert empty rules for those loops and conditionals that can be skipped. For loops, the entire sequence has to contain the empty marker.Next, we want to insert empty rules for those loops and conditionals that can be skipped. For loops, the entire sequence has to contain the empty marker.
xxxxxxxxxxdef check_empty_rules(grammar): new_grammar = {} for k in grammar: if k in ':if_': name, marker = k.split('#') if name.endswith(' *'): new_grammar[k] = grammar[k].add(('',)) else: new_grammar[k] = grammar[k] elif k in ':while_': # TODO -- we have to check the rules for sequences of whiles. # for now, ignore. new_grammar[k] = grammar[k] else: new_grammar[k] = grammar[k] return new_grammarxxxxxxxxxx%top ne_calc_grammar = check_empty_rules(calc_grammar)%top show_grammar(ne_calc_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(ne_calc_grammar), start_symbol='<START>')for i in range(10): print(repr(gf.fuzz()))# )]xxxxxxxxxx%top ne_mathexpr_grammar = check_empty_rules(mathexpr_grammar)%top show_grammar(ne_mathexpr_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(ne_mathexpr_grammar), start_symbol='<START>')for i in range(10): print(repr(gf.fuzz()))# )]We now need to generalize the loops. The idea is to look for patterns exclusively in the similarly named while loops using any of the regular expression learners. For the prototype, we replaced the modified Sequitur with the modified Fernau which gave us better regular expressions than before. The main constraint we have is that we want to avoid repeated execution of program if possible. Fernau algorithm can recover a reasonably approximate regular exression based only on positive data.We now need to generalize the loops. The idea is to look for patterns exclusively in the similarly named while loops using any of the regular expression learners. For the prototype, we replaced the modified Sequitur with the modified Fernau which gave us better regular expressions than before. The main constraint we have is that we want to avoid repeated execution of program if possible. Fernau algorithm can recover a reasonably approximate regular exression based only on positive data.
#### The modified Fernau algorithmThe Fernau algorithm is from _Algorithms for learning regular expressions from positive data_ by _HenningFernau_. Our algorithm uses a modified form of the Prefix-Tree-Acceptor from Fernau. First we define an LRF buffer of a given size.The Fernau algorithm is from Algorithms for learning regular expressions from positive data by HenningFernau. Our algorithm uses a modified form of the Prefix-Tree-Acceptor from Fernau. First we define an LRF buffer of a given size.
xxxxxxxxxximport jsonclass Buf: def __init__(self, size): self.size = size self.items = [None] * self.sizeThe `add1()` takes in an array, and transfers the first element of the array into the end of current buffer, and simultaneously drops the first element of the buffer.The add1() takes in an array, and transfers the first element of the array into the end of current buffer, and simultaneously drops the first element of the buffer.
xxxxxxxxxxclass Buf(Buf): def add1(self, items): self.items.append(items.pop(0)) return self.items.pop(0)For equality between the buffer and an array, we only compare when both the array and the items are actually elements and not chunked arrays.For equality between the buffer and an array, we only compare when both the array and the items are actually elements and not chunked arrays.
xxxxxxxxxxclass Buf(Buf): def __eq__(self, items): if any(isinstance(i, dict) for i in self.items): return False if any(isinstance(i, dict) for i in items): return False return items == self.itemsThe `detect_chunks()` detects any repeating portions of a list of `n` size.The detect_chunks() detects any repeating portions of a list of n size.
xxxxxxxxxxdef detect_chunks(n, lst_): lst = list(lst_) chunks = set() last = Buf(n) # check if the next_n elements are repeated. for _ in range(len(lst) - n): lnext_n = lst[0:n] if last == lnext_n: # found a repetition. chunks.add(tuple(last.items)) else: pass last.add1(lst) return chunksOnce we have detected plausible repeating sequences, we gather all similar sequences into arrays.Once we have detected plausible repeating sequences, we gather all similar sequences into arrays.
xxxxxxxxxxdef chunkify(lst_,n , chunks): lst = list(lst_) chunked_lst = [] while len(lst) >= n: lnext_n = lst[0:n] if (not any(isinstance(i, dict) for i in lnext_n)) and tuple(lnext_n) in chunks: chunked_lst.append({'_':lnext_n}) lst = lst[n:] else: chunked_lst.append(lst.pop(0)) chunked_lst.extend(lst) return chunked_lstThe `identify_chunks()` simply calls the `detect_chunks()` on all given lists, and then converts all chunks identified into arrays.The identify_chunks() simply calls the detect_chunks() on all given lists, and then converts all chunks identified into arrays.
xxxxxxxxxxdef identify_chunks(my_lsts): # initialize all_chunks = {} maximum = max(len(lst) for lst in my_lsts) for i in range(1, maximum//2+1): all_chunks[i] = set() # First, identify chunks in each list. for lst in my_lsts: for i in range(1,maximum//2+1): chunks = detect_chunks(i, lst) all_chunks[i] |= chunks # Then, chunkify new_lsts = [] for lst in my_lsts: for i in range(1,maximum//2+1): chunks = all_chunks[i] lst = chunkify(lst, i, chunks) new_lsts.append(lst) return new_lsts##### Prefix tree acceptorThe prefix tree acceptor is a way to represent positive data. The `Node` class holds a single node in the prefix tree acceptor.The prefix tree acceptor is a way to represent positive data. The Node class holds a single node in the prefix tree acceptor.
xxxxxxxxxxclass Node: # Each tree node gets its unique id. _uid = 0 def __init__(self, item): # self.repeats = False self.count = 1 # how many repetitions. self.counters = set() self.last = False self.children = [] self.item = item self.uid = Node._uid Node._uid += 1 def update_counters(self): self.counters.add(self.count) self.count = 0 for c in self.children: c.update_counters() def __repr__(self): return str(self.to_json()) def __str__(self): return str("(%s, [%s])", (self.item, ' '.join([str(i) for i in self.children]))) def to_json(self): s = ("(%s)" % ' '.join(self.item['_'])) if isinstance(self.item, dict) else str(self.item) return (s, tuple(self.counters), [i.to_json() for i in self.children]) def inc_count(self): self.count += 1 def add_ref(self): self.count = 1 def get_child(self, c): for i in self.children: if i.item == c: return i return None def add_child(self, c): # first check if it is the current node. If it is, increment # count, and return ourselves. if c == self.item: self.inc_count() return self else: # check if it is one of the children. If it is a child, then # preserve its original count. nc = self.get_child(c) if nc is None: nc = Node(c) self.children.append(nc) else: nc.add_ref() return ncxxxxxxxxxxdef update_tree(lst_, root): lst = list(lst_) branch = root while lst: first, *lst = lst branch = branch.add_child(first) branch.last = True return rootdef create_tree_with_lsts(lsts): Node._uid = 0 root = Node(None) for lst in lsts: root.count = 1 # there is at least one element. update_tree(lst, root) root.update_counters() return rootdef get_star(node, key): if node.item is None: return '' if isinstance(node.item, dict): # take care of counters elements = node.item['_'] my_key = "<%s-%d-s>" % (key, node.uid) alts = [elements] if len(node.counters) > 1: # repetition alts.append(elements + [my_key]) return [my_key], {my_key:alts} else: return [str(node.item)], {}def node_to_grammar(node, grammar, key): rule = [] alts = [rule] if node.uid == 0: my_key = "<%s>" % key else: my_key = "<%s-%d>" % (key, node.uid) grammar[my_key] = alts if node.item is not None: mk, g = get_star(node, key) rule.extend(mk) grammar.update(g) # is the node last? if node.last: assert node.item is not None # add a duplicate rule that ends here. ending_rule = list(rule) # if there are no children, the current rule is # any way ending. if node.children: alts.append(ending_rule) if node.children: if len(node.children) > 1: my_ckey = "<%s-%d-c>" % (key, node.uid) rule.append(my_ckey) grammar[my_ckey] = [ ["<%s-%d>" % (key, c.uid)] for c in node.children] else: my_ckey = "<%s-%d>" % (key, node.children[0].uid) rule.append(my_ckey) else: pass for c in node.children: node_to_grammar(c, grammar, key) return grammardef generate_grammar(lists, key): lsts = identify_chunks(lists) tree = create_tree_with_lsts(lsts) grammar = {} node_to_grammar(tree, grammar, key) return grammarGiven a rule, determine the abstraction for it.Given a rule, determine the abstraction for it.
xxxxxxxxxxdef collapse_alts(rules, k): ss = [[str(r) for r in rule] for rule in rules] x = generate_grammar(ss, k[1:-1]) return xxxxxxxxxxxdef collapse_rules(grammar): r_grammar = {} for k in grammar: new_grammar = collapse_alts(grammar[k], k) # merge the new_grammar with r_grammar # we know none of the keys exist in r_grammar because # new keys are k prefixed. for k_ in new_grammar: r_grammar[k_] = new_grammar[k_] return r_grammarxxxxxxxxxx%top collapsed_calc_grammar = collapse_rules(ne_calc_grammar)%top show_grammar(collapsed_calc_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(ne_mathexpr_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxx%top collapsed_mathexpr_grammar = collapse_rules(ne_mathexpr_grammar)%top show_grammar(collapsed_mathexpr_grammar)xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(collapsed_mathexpr_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxx%%top# [(gf = GrammarFuzzer.GrammarFuzzer(to_fuzzable_grammar(collapsed_calc_grammar), start_symbol='<START>')for i in range(10): print(gf.fuzz())# )]xxxxxxxxxxdef convert_spaces(grammar): keys = {key: key.replace(' ', '_') for key in grammar} new_grammar = {} for key in grammar: new_alt = [] for rule in grammar[key]: new_rule = [] for t in rule: for k in keys: t = t.replace(k, keys[k]) new_rule.append(t) new_alt.append(''.join(new_rule)) new_grammar[keys[key]] = new_alt return new_grammarxxxxxxxxxx%top calc_grammar = convert_spaces(collapsed_calc_grammar)%top show_grammar(calc_grammar, canonical=False)xxxxxxxxxxfrom fuzzingbook import GrammarFuzzer, GrammarMiner, Parserxxxxxxxxxx%top gf = GrammarFuzzer.GrammarFuzzer(calc_grammar, start_symbol='<START>')xxxxxxxxxx%%top# [(for i in range(10): print(gf.fuzz())# )]xxxxxxxxxxdef first_in_chain(token, chain): while True: if token in chain: token = chain[token] else: break return tokenReturn a new symbol for `grammar` based on `symbol_name`.Return a new symbol for grammar based on symbol_name.
xxxxxxxxxxdef new_symbol(grammar, symbol_name="<symbol>"): if symbol_name not in grammar: return symbol_name count = 1 while True: tentative_symbol_name = symbol_name[:-1] + "-" + repr(count) + ">" if tentative_symbol_name not in grammar: return tentative_symbol_name count += 1Replace keys that have a single token definition with the token in the defition.Replace keys that have a single token definition with the token in the defition.
xxxxxxxxxxdef replacement_candidates(grammar): to_replace = {} for k in grammar: if len(grammar[k]) != 1: continue if k in {'<START>', '<main>'}: continue rule = grammar[k][0] res = re.findall(RE_NONTERMINAL, rule) if len(res) == 1: if len(res[0]) != len(rule): continue to_replace[k] = first_in_chain(res[0], to_replace) elif len(res) == 0: to_replace[k] = first_in_chain(rule, to_replace) else: continue # more than one. return to_replacexxxxxxxxxxdef replace_key_by_new_key(grammar, keys_to_replace): new_grammar = {} for key in grammar: new_rules = [] for rule in grammar[key]: for k in keys_to_replace: new_key = keys_to_replace[k] rule = rule.replace(k, keys_to_replace[k]) new_rules.append(rule) new_grammar[keys_to_replace.get(key, key)] = new_rules assert len(grammar) == len(new_grammar) return new_grammarxxxxxxxxxxdef replace_key_by_key(grammar, keys_to_replace): new_grammar = {} for key in grammar: if key in keys_to_replace: continue new_rules = [] for rule in grammar[key]: for k in keys_to_replace: new_key = keys_to_replace[k] rule = rule.replace(k, keys_to_replace[k]) new_rules.append(rule) new_grammar[key] = new_rules return new_grammarxxxxxxxxxxdef remove_single_entries(grammar): keys_to_replace = replacement_candidates(grammar) return replace_key_by_key(grammar, keys_to_replace)Remove keys that have similar rules.Remove keys that have similar rules.
xxxxxxxxxxdef collect_duplicate_rule_keys(grammar): collect = {} for k in grammar: salt = str(sorted(grammar[k])) if salt not in collect: collect[salt] = (k, set()) else: collect[salt][1].add(k) return collectxxxxxxxxxxdef remove_duplicate_rule_keys(grammar): g = grammar while True: collect = collect_duplicate_rule_keys(g) keys_to_replace = {} for salt in collect: k, st = collect[salt] for s in st: keys_to_replace[s] = k if not keys_to_replace: break g = replace_key_by_key(g, keys_to_replace) return gRemove all the control flow vestiges from names, and simply name them sequentially.Remove all the control flow vestiges from names, and simply name them sequentially.
xxxxxxxxxxdef collect_replacement_keys(grammar): g = copy.deepcopy(grammar) to_replace = {} for k in grammar: if ':' in k: first, rest = k.split(':') sym = new_symbol(g, symbol_name=first + '>') assert sym not in g g[sym] = None to_replace[k] = sym else: continue return to_replacexxxxxxxxxxdef cleanup_tokens(grammar): keys_to_replace = collect_replacement_keys(grammar) g = replace_key_by_new_key(grammar, keys_to_replace) return gxxxxxxxxxxdef replaceAngular(grammar): new_g = {} replaced = False for k in grammar: new_rules = [] for rule in grammar[k]: new_rule = rule.replace('<>', '<openA><closeA>').replace('</>', '<openA>/<closeA>') if rule != new_rule: replaced = True new_rules.append(new_rule) new_g[k] = new_rules if replaced: new_g['<openA>'] = ['<'] new_g['<closeA>'] = ['<'] return new_gRemove keys that are referred to only from a single rule, and which have a single alternative.Import. This can't work on canonical representation. First, given a key, we figure out its distance to `<START>`.This is different from `remove_single_entries()` in that, there we do not care if the key is being used multiple times. Here, we only replace keys that are referred to only once.Remove keys that are referred to only from a single rule, and which have a single alternative.
Import. This can't work on canonical representation. First, given a key, we figure out its distance to <START>.
This is different from remove_single_entries() in that, there we do not care if the key is being used multiple times. Here, we only replace keys that are referred to only once.
xxxxxxxxxximport mathxxxxxxxxxxdef len_to_start(item, parents, seen=None): if seen is None: seen = set() if item in seen: return math.inf seen.add(item) if item == '<START>': return 0 else: return 1 + min(len_to_start(p, parents, seen) for p in parents[item])xxxxxxxxxxdef order_by_length_to_start(items, parents): return sorted(items, key=lambda i: len_to_start(i, parents))Next, we generate a map of `child -> [parents]`.Next, we generate a map of child -> [parents].
xxxxxxxxxxdef id_parents(grammar, key, seen=None, parents=None): if parents is None: parents = {} seen = set() if key in seen: return seen.add(key) for rule in grammar[key]: res = re.findall(RE_NONTERMINAL, rule) for token in res: if token.startswith('<') and token.endswith('>'): if token not in parents: parents[token] = list() parents[token].append(key) for ckey in {i for i in grammar if i not in seen}: id_parents(grammar, ckey, seen, parents) return parentsNow, all together.Now, all together.
xxxxxxxxxxdef remove_single_alts(grammar, start_symbol='<START>'): single_alts = {p for p in grammar if len(grammar[p]) == 1 and p != start_symbol} child_parent_map = id_parents(grammar, start_symbol) single_refs = {p:child_parent_map[p] for p in single_alts if len(child_parent_map[p]) <= 1} keys_to_replace = {p:grammar[p][0] for p in order_by_length_to_start(single_refs, child_parent_map)} g = replace_key_by_key(grammar, keys_to_replace) return gxxxxxxxxxximport osimport hashlibxxxxxxxxxxdef accio_grammar(fname, src, samples, cache=True): hash_id = hashlib.md5(json.dumps(samples).encode()).hexdigest() cache_file = "build/%s_%s_generalized_tree.json" % (fname, hash_id) if os.path.exists(cache_file) and cache: with open(cache_file) as f: generalized_tree = json.load(f) else: # regenerate the program program_src[fname] = src with open('subjects/%s' % fname, 'w+') as f: print(src, file=f) resrc = rewrite(src, fname) with open('build/%s' % fname, 'w+') as f: print(resrc, file=f) os.makedirs('samples/%s' % fname, exist_ok=True) sample_files = {("samples/%s/%d.csv"%(fname,i)):s for i,s in enumerate(samples)} for k in sample_files: with open(k, 'w+') as f: print(sample_files[k], file=f) call_trace = [] for i in sample_files: thash_id = hashlib.md5(json.dumps(sample_files[i]).encode()).hexdigest() trace_cache_file = "build/%s_%s_trace.json" % (fname, thash_id) if os.path.exists(trace_cache_file) and cache: with open(trace_cache_file) as f: my_tree = f.read() else: my_tree = do(["python", "./build/%s" % fname, i]).stdout with open(trace_cache_file, 'w+') as f: print(my_tree, file=f) call_trace.append(json.loads(my_tree)[0]) mined_tree = miner(call_trace) generalized_tree = generalize_iter(mined_tree) # costly data structure. with open(cache_file, 'w+') as f: json.dump(generalized_tree, f) g = convert_to_grammar(generalized_tree) with open('build/%s_grammar_1.json' % fname, 'w+') as f: json.dump(g, f) g = check_empty_rules(g) with open('build/%s_grammar_2.json' % fname, 'w+') as f: json.dump(g, f) g = collapse_rules(g) # <- regex learner with open('build/%s_grammar_3.json' % fname, 'w+') as f: json.dump(g, f) g = convert_spaces(g) with open('build/%s_grammar_4.json' % fname, 'w+') as f: json.dump(g, f) e = remove_single_alts(cleanup_tokens(remove_duplicate_rule_keys(remove_single_entries(g)))) e = show_grammar(e, canonical=False) with open('build/%s_grammar.json' % fname, 'w+') as f: json.dump(e, f) return exxxxxxxxxx%top calc_grammar = accio_grammar('calculator.py', VARS['calc_src'], ['(1+2)-2', '11'])xxxxxxxxxx%top calc_grammarxxxxxxxxxx%top gf = GrammarFuzzer.GrammarFuzzer(calc_grammar, start_symbol='<START>')xxxxxxxxxx%%top# [(for i in range(10): print(gf.fuzz())# )]## LibrariesWe need a few instrumented supporting libraries.We need a few instrumented supporting libraries.
xxxxxxxxxx%%var myio_srcxxxxxxxxxx# [(with open('build/myio.py', 'w+') as f: print(VARS['myio_src'], file=f)# )]%%var mylex_srcxxxxxxxxxx# [(with open('build/mylex.py', 'w+') as f: print(VARS['mylex_src'], file=f)# )]xxxxxxxxxximport fuzzingbookxxxxxxxxxxassert os.path.isfile('json.tar.gz') # for microjson validationxxxxxxxxxxMax_Precision = 1000Max_Recall = 1000Autogram = {}AutogramFuzz = {}AutogramGrammar = {}Mimid = {}MimidFuzz = {}MimidGrammar = {}MaxTimeout = 60*60 # 60 minutesMaxParseTimeout = 60*5CHECK = {'cgidecode','calculator', 'mathexpr', 'urlparse', 'netrc', 'microjson'}reset_generalizer()xxxxxxxxxxdef recover_grammar_with_taints(name, src, samples): header = '''import fuzzingbook.GrammarMinerfrom fuzzingbook.GrammarMiner import Tracerfrom fuzzingbook.InformationFlow import ostrfrom fuzzingbook.GrammarMiner import TaintedScopedGrammarMiner as TSGMfrom fuzzingbook.GrammarMiner import readableimport subjects.autogram_%simport fuzzingbookclass ostr_new(ostr): def __new__(cls, value, *args, **kw): return str.__new__(cls, value) def __init__(self, value, taint=None, origin=None, **kwargs): self.taint = taint if origin is None: origin = ostr.DEFAULT_ORIGIN if isinstance(origin, int): self.origin = list(range(origin, origin + len(self))) else: self.origin = origin #assert len(self.origin) == len(self) <-- bug fix here.class ostr_new(ostr_new): def create(self, res, origin=None): return ostr_new(res, taint=self.taint, origin=origin) def __repr__(self): # bugfix here. return str.__repr__(self) def recover_grammar_with_taints(fn, inputs, **kwargs): miner = TSGM() for inputstr in inputs: with Tracer(ostr_new(inputstr), **kwargs) as tracer: fn(tracer.my_input) miner.update_grammar(tracer.my_input, tracer.trace) return readable(miner.clean_grammar())def replaceAngular(grammar): # special handling for Autogram because it does not look for <> and </> # in rules, which messes up with parsing. new_g = {} replaced = False for k in grammar: new_rules = [] for rule in grammar[k]: new_rule = rule.replace('<>', '<openA><closeA>').replace('</>', '<openA>/<closeA>').replace('<lambda>','<openA>lambda<closeA>') if rule != new_rule: replaced = True new_rules.append(new_rule) new_g[k] = new_rules if replaced: new_g['<openA>'] = ['<'] new_g['<closeA>'] = ['<'] return new_gdef replace_start(grammar): assert '<start>' in grammar start = grammar['<start>'] del grammar['<start>'] grammar['<START>'] = start return replaceAngular(grammar) samples = [i.strip() for i in [%s] if i.strip()]import jsonautogram_grammar_t = recover_grammar_with_taints(subjects.autogram_%s.main, samples)print(json.dumps(replace_start(autogram_grammar_t)))''' mod_name = name.replace('.py','') with open('./subjects/autogram_%s' % name, 'w+') as f: print(src, file=f) with open('./build/autogram_%s' % name, 'w+') as f: print(header % (mod_name, ',\n'.join([repr(i) for i in samples]), mod_name), file=f) with ExpectTimeout(MaxTimeout): result = do(["python","./build/autogram_%s" % name], env={'PYTHONPATH':'.'}, log=True) if result.stderr.strip(): print(result.stderr, file=sys.stderr) return show_grammar(json.loads(result.stdout), canonical=False) return {}xxxxxxxxxxfrom fuzzingbook.Parser import IterativeEarleyParser### Check RecallHow many of the *valid* inputs from the golden grammar can be recognized by a parser using our grammar?How many of the valid inputs from the golden grammar can be recognized by a parser using our grammar?
xxxxxxxxxxdef check_recall(golden_grammar, my_grammar, validator, maximum=Max_Recall, log=False): my_count = maximum ie = IterativeEarleyParser(my_grammar, start_symbol='<START>') golden = GrammarFuzzer.GrammarFuzzer(golden_grammar, start_symbol='<START>') success = 0 while my_count != 0: src = golden.fuzz() try: validator(src) my_count -= 1 try: #print('?', repr(src), file=sys.stderr) for tree in ie.parse(src): success += 1 break if log: print(maximum - my_count, '+', repr(src), success, file=sys.stderr) except: #print("Error:", sys.exc_info()[0], file=sys.stderr) if log: print(maximum - my_count, '-', repr(src), file=sys.stderr) pass except: pass return (success, maximum)### Check PrecisionHow many of the inputs produced using our grammar are valid? (Accepted by the program).How many of the inputs produced using our grammar are valid? (Accepted by the program).
xxxxxxxxxxdef check_precision(name, grammar, maximum=Max_Precision, log=False): success = 0 with ExpectError(): fuzzer = GrammarFuzzer.GrammarFuzzer(grammar, start_symbol='<START>') for i in range(maximum): v = fuzzer.fuzz() c = check(v, name) success += (1 if c else 0) if log: print(i, repr(v), c) return (success, maximum)xxxxxxxxxxfrom datetime import datetimexxxxxxxxxxclass timeit(): def __enter__(self): self.tic = datetime.now() return self def __exit__(self, *args, **kwargs): self.delta = datetime.now() - self.tic self.runtime = (self.delta.microseconds, self.delta)xxxxxxxxxxfrom fuzzingbook.ExpectError import ExpectError, ExpectTimeoutxxxxxxxxxxfrom fuzzingbook.Parser import IterativeEarleyParserxxxxxxxxxxdef process(s): # see the rewrite fn. We remove newlines from grammar training to make it easier to visualize return s.strip().replace('\n', ' ')xxxxxxxxxxdef check_parse(grammar, inputstrs, start_symbol='<START>'): count = 0 e = IterativeEarleyParser(grammar, start_symbol=start_symbol) for s in inputstrs: with ExpectError(): with ExpectTimeout(MaxParseTimeout): for tree in e.parse(process(s)): count += 1 break return (count, len(inputstrs))xxxxxxxxxxfrom fuzzingbook.ExpectError import ExpectError, ExpectTimeoutxxxxxxxxxxfrom fuzzingbook import GrammarFuzzer, Parserxxxxxxxxxxdef save_grammar(grammar, tool, program): with open("build/%s-%s.grammar.json" % (tool, program), 'w+') as f: json.dump(grammar, f) return {k:sorted(grammar[k]) for k in grammar}xxxxxxxxxximport stringxxxxxxxxxxMimid_p = {}Mimid_r = {}Autogram_p = {}Autogram_r = {}Mimid_t ={}Autogram_t ={}for k in program_src: Mimid_p[k] = None Mimid_r[k] = None Mimid_t[k] = None Autogram_p[k] = None Autogram_r[k] = None Autogram_t[k] = Nonexxxxxxxxxximport urllib.parsexxxxxxxxxxcgidecode_golden = { "<START>": [ "<cgidecode-s>" ], "<cgidecode-s>": [ '<cgidecode>', '<cgidecode><cgidecode-s>'], "<cgidecode>": [ "<single_char>", "<percentage_char>" ], "<single_char>": list(string.ascii_lowercase + string.ascii_uppercase + string.digits + "-./_~"), "<percentage_char>": [urllib.parse.quote(i) for i in string.punctuation if i not in "-./_~"],}xxxxxxxxxxcgidecode_samples = []xxxxxxxxxxwith timeit() as t: cgidecode_grammar = accio_grammar('cgidecode.py', VARS['cgidecode_src'], cgidecode_samples)Mimid_t['cgidecode.py'] = t.runtimexxxxxxxxxxsave_grammar(cgidecode_grammar, 'mimid', 'cgidecode')xxxxxxxxxxif 'cgidecode' in CHECK: result = check_precision('cgidecode.py', cgidecode_grammar) Mimid_p['cgidecode.py'] = result print(result)xxxxxxxxxximport subjects.cgidecodexxxxxxxxxxif 'cgidecode' in CHECK: result = check_recall(cgidecode_golden, cgidecode_grammar, subjects.cgidecode.main) Mimid_r['cgidecode.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_cgidecode_grammar_t = recover_grammar_with_taints('cgidecode.py', VARS['cgidecode_src'], cgidecode_samples)Autogram_t['cgidecode.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_cgidecode_grammar_t, 'autogram_t', 'cgidecode')xxxxxxxxxxif 'cgidecode' in CHECK: result = check_precision('cgidecode.py', autogram_cgidecode_grammar_t) Autogram_p['cgidecode.py'] = result print(result)xxxxxxxxxxif 'cgidecode' in CHECK: result = check_recall(cgidecode_golden, autogram_cgidecode_grammar_t, subjects.cgidecode.main) Autogram_r['cgidecode.py'] = result print(result)xxxxxxxxxxcalc_golden = { "<START>": [ "<expr>" ], "<expr>": [ "<term>+<expr>", "<term>-<expr>", "<term>" ], "<term>": [ "<factor>*<term>", "<factor>/<term>", "<factor>" ], "<factor>": [ "(<expr>)", "<number>" ], "<number>": [ "<integer>.<integer>", "<integer>" ], "<integer>": [ "<digit><integer>", "<digit>" ], "<digit>": [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" ]}xxxxxxxxxxcalc_samples=[i.strip() for i in '''\(1+2)*3/(423-334+9983)-5-((6)-(701))(123+133*(12-3)/9+8)+33(100)21*333/44+210023*234*22*4(123+133*(12-3)/9+8)+331+231/20-2555+(234-445)1-(41/2)443-334+33-222'''.split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: calc_grammar = accio_grammar('calculator.py', VARS['calc_src'], calc_samples)Mimid_t['calculator.py'] = t.runtimexxxxxxxxxxsave_grammar(calc_grammar, 'mimid', 'calculator')xxxxxxxxxxif 'calculator' in CHECK: result = check_precision('calculator.py', calc_grammar) Mimid_p['calculator.py'] = result print(result)xxxxxxxxxximport subjects.calculatorxxxxxxxxxxif 'calculator' in CHECK: result = check_recall(calc_golden, calc_grammar, subjects.calculator.main) Mimid_r['calculator.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_calc_grammar_t = recover_grammar_with_taints('calculator.py', VARS['calc_src'], calc_samples)Autogram_t['calculator.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_calc_grammar_t, 'autogram_t', 'calculator')xxxxxxxxxxif 'calculator' in CHECK: result = check_precision('calculator.py', autogram_calc_grammar_t) Autogram_p['calculator.py'] = result print(result)xxxxxxxxxxif 'calculator' in CHECK: result = check_recall(calc_golden, autogram_calc_grammar_t, subjects.calculator.main) Autogram_r['calculator.py'] = result print(result)xxxxxxxxxxmathexpr_golden = { "<START>": [ "<expr>" ], "<word>": [ "pi", "e", "phi", "abs", "acos", "asin", "atan", "atan2", "ceil", "cos", "cosh", "degrees", "exp", "fabs", "floor", "fmod", "frexp", "hypot", "ldexp", "log", "log10", "modf", "pow", "radians", "sin", "sinh", "sqrt", "tan", "tanh", "<alpha>" ], "<alpha>": [ "a", "b", "c", "d", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"], "<expr>": [ "<term>+<expr>", "<term>-<expr>", "<term>" ], "<term>": [ "<factor>*<term>", "<factor>/<term>", "<factor>" ], "<factor>": [ "+<factor>", "-<factor>", "(<expr>)", "<word>(<expr>,<expr>)", "<word>", "<number>" ], "<number>": [ "<integer>.<integer>", "<integer>" ], "<integer>": [ "<digit><integer>", "<digit>" ], "<digit>": [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" ]}xxxxxxxxxxmathexpr_samples=[i.strip() for i in '''(pi*e+2)*3/(423-334+9983)-5-((6)-(701-x))(123+133*(12-3)/9+8)+33(100)pi * e(1 - 1 + -1) * pi1.0 / 3 * 6(x + e * 10) / 10(a + b) / c1 + pi / 4(1-2)/3.0 + 0.0000-(1 + 2) * 3(1 + 2) * 31001 + 2 * 323*234*22*4(123+133*(12-3)/9+8)+331+231/20-2555+(234-445)1-(41/2)443-334+33-222cos(x+4*3) + 2 * 3exp(0)-(1 + 2) * 3(1-2)/3.0 + 0.0000abs(-2) + pi / 4(pi + e * 10) / 101.0 / 3 * 6cos(pi) * 1atan2(2, 1)hypot(5, 12)pow(3, 5)'''.strip().split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: mathexpr_grammar = accio_grammar('mathexpr.py', VARS['mathexpr_src'], mathexpr_samples, cache=False)Mimid_t['mathexpr.py'] = t.runtimexxxxxxxxxxsave_grammar(mathexpr_grammar, 'mimid', 'mathexpr')xxxxxxxxxxif 'mathexpr' in CHECK: result = check_precision('mathexpr.py', mathexpr_grammar) Mimid_p['mathexpr.py'] = result print(result)xxxxxxxxxximport subjects.mathexprxxxxxxxxxxif 'mathexpr' in CHECK: result = check_recall(mathexpr_golden, mathexpr_grammar, subjects.mathexpr.main) Mimid_r['mathexpr.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_mathexpr_grammar_t = recover_grammar_with_taints('mathexpr.py', VARS['mathexpr_src'], mathexpr_samples)Autogram_t['mathexpr.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_mathexpr_grammar_t, 'autogram_t', 'mathexpr')xxxxxxxxxxif 'mathexpr' in CHECK: result = check_precision('mathexpr.py', autogram_mathexpr_grammar_t) Autogram_p['mathexpr.py'] = result print(result)xxxxxxxxxxif 'mathexpr' in CHECK: result = check_recall(mathexpr_golden, autogram_mathexpr_grammar_t, subjects.mathexpr.main) Autogram_r['mathexpr.py'] = result print(result)xxxxxxxxxxurlparse_golden = { "<START>": [ "<url>" ], "<url>": [ "<scheme>://<authority><path><query>" ], "<scheme>": [ "http", "https", "ftp", "ftps" ], "<authority>": [ "<host>", "<host>:<port>", "<userinfo>@<host>", "<userinfo>@<host>:<port>" ], "<user>": [ "user1", "user2", "user3", "user4", "user5" ], "<pass>": [ "pass1", "pass2", "pass3", "pass4", "pass5" ], "<host>": [ "host1", "host2", "host3", "host4", "host5" ], "<port>": [ "<nat>" ], "<nat>": [ "10", "20", "30", "40", "50" ], "<userinfo>": [ "<user>:<pass>" ], "<path>": [ "", "/", "/<id>", "/<id><path>" ], "<id>": [ "folder" ], "<query>": [ "", "?<params>" ], "<params>": [ "<param>", "<param>&<params>" ], "<param>": [ "<key>=<value>" ], "<key>": [ "key1", "key2", "key3", "key4" ], "<value>": [ "value1", "value2", "value3", "value4" ]}xxxxxxxxxxurlparse_samples = [i.strip() for i in '''http://www.python.orghttp://www.python.org#abchttp://www.python.org'http://www.python.org#abc'http://www.python.org?q=abchttp://www.python.org/#abchttp://a/b/c/d;p?q#fhttps://www.python.orghttps://www.python.org#abchttps://www.python.org?q=abchttps://www.python.org/#abchttps://a/b/c/d;p?q#fhttp://www.python.org?q=abcfile:///tmp/junk.txtimap://mail.python.org/mbox1mms://wms.sys.hinet.net/cts/Drama/09006251100.asfnfs://server/path/to/file.txtsvn+ssh://svn.zope.org/repos/main/ZConfig/trunk/git+ssh://git@github.com/user/project.gitfile:///tmp/junk.txtimap://mail.python.org/mbox1mms://wms.sys.hinet.net/cts/Drama/09006251100.asfnfs://server/path/to/file.txthttp://www.python.org/#abcsvn+ssh://svn.zope.org/repos/main/ZConfig/trunk/git+ssh://git@github.com/user/project.gitg:hhttp://a/b/c/ghttp://a/b/c/g/http://a/ghttp://ghttp://a/b/c/g?yhttp://a/b/c/g?y/./xhttp://a/b/c/d;p?q#fhttp://a/b/c/d;p?q#shttp://a/b/c/g#shttp://a/b/c/g#s/./xhttp://a/b/c/g?y#shttp://a/b/c/g;xhttp://a/b/c/g;x?y#shttp://a/b/c/http://a/b/https://www.python.orghttp://a/b/ghttp://a/http://a/ghttp://a/b/c/d;p?q#fhttp://a/../gg:hhttp://a/b/c/ghttp://a/b/c/g/https://www.python.org#abchttp://ghttp://a/b/c/g?yhttp://a/b/c/d;p?q#shttp://a/b/c/g#shttp://a/b/c/g?y#shttp://a/b/c/g;xhttp://a/b/c/g;x?y#shttps://www.python.org?q=abchttps://www.python.org/#abchttp://[::1]:5432/foo/http://[dead:beef::1]:5432/foo/http://[dead:beef::]:5432/foo/http://[dead:beef:cafe:5417:affe:8FA3:deaf:feed]:5432/foo/http://[::12.34.56.78]:5432/foo/http://[::ffff:12.34.56.78]:5432/foo/http://Test.python.org/foo/http://12.34.56.78/foo/http://[::1]/foo/http://[dead:beef::1]/foo/https://a/b/c/d;p?q#fhttp://[dead:beef::]/foo/http://[dead:beef:cafe:5417:affe:8FA3:deaf:feed]/foo/http://[::12.34.56.78]/foo/http://[::ffff:12.34.56.78]/foo/http://Test.python.org:5432/foo/http://12.34.56.78:5432/foo/http://[::1]:5432/foo/http://[dead:beef::1]:5432/foo/http://[dead:beef::]:5432/foo/http://[dead:beef:cafe:5417:affe:8FA3:deaf:feed]:5432/foo/'''.strip().split('\n') if i.strip()]Unfortunately, as we detail in the paper, both the miners are unable to generalize well with the kind of inputs above. The problem is the lack of generalization of string tokens. Hence we use the ones below, which are generated using the _golden grammar_. This is the output of simply using the golden grammar to fuzz and generate 100 inputs. Captured here for deterministic reproduction.Unfortunately, as we detail in the paper, both the miners are unable to generalize well with the kind of inputs above. The problem is the lack of generalization of string tokens. Hence we use the ones below, which are generated using the golden grammar. This is the output of simply using the golden grammar to fuzz and generate 100 inputs. Captured here for deterministic reproduction.
xxxxxxxxxxurlparse_samples = [i.strip() for i in '''https://user4:pass2@host2:30/folder//?key1=value3ftp://user2:pass5@host2?key3=value1ftp://host1/folder//ftp://host4:30/folderhttp://user1:pass4@host1/folderhttps://user1:pass4@host4ftp://host3:40/http://user5:pass3@host1:10/http://host4:10ftp://host4/folder//?key4=value2https://host5/folderftp://user4:pass5@host4/folder//folder//folder/ftp://user5:pass2@host3https://host2/https://user4:pass3@host3/folderhttp://host5:50https://host3/folder?key3=value3http://user5:pass3@host1/folder?key1=value4&key4=value2&key2=value1&key2=value3https://user4:pass3@host1/folderhttp://user3:pass3@host2:40/ftp://host2/folder?key2=value3https://user4:pass4@host2:50/folder/https://user3:pass5@host4?key4=value1ftp://user3:pass3@host1:40?key1=value3https://user1:pass1@host3:50ftps://user2:pass2@host3/https://host4:30/folderhttp://host5/folder/?key2=value2ftps://host3:10/folder/ftp://user4:pass4@host5/folderhttp://user2:pass2@host4:10/folder//folder//folder/ftp://host1:10/folder/ftp://host3?key3=value1&key1=value3ftp://user5:pass2@host4/folder//http://host2ftps://user5:pass3@host3:30ftp://host5/folderhttps://user2:pass2@host4:20/?key2=value4&key1=value2&key3=value3&key3=value2&key4=value3https://host3/folder//folder//folderftp://user2:pass3@host4:50/ftps://user5:pass5@host4/ftps://user3:pass3@host5?key3=value3ftp://host4?key1=value3&key3=value3&key3=value1https://host3/?key4=value2&key1=value2&key4=value3&key2=value4ftps://host1/folder//ftp://host5/folder//https://user2:pass1@host5:10/folder//http://user5:pass5@host2:10/folderhttps://host5/folderftps://user5:pass3@host4:40/?key1=value3http://user1:pass3@host4/folder//?key4=value4&key3=value3http://user2:pass2@host5:50/folder?key4=value3&key4=value2http://host3?key3=value3&key2=value2https://user3:pass3@host2:20/folderhttps://host5/folder?key2=value1&key3=value2&key1=value4&key3=value4&key3=value1&key1=value2&key1=value2ftp://user2:pass5@host5:40/?key4=value4https://user3:pass4@host2:20/ftps://host3:30/?key3=value1ftp://host3/folderftps://user1:pass1@host5:20/?key3=value1https://user4:pass5@host3?key4=value2ftp://host4:40/folder?key3=value1ftps://host2/folder//folderhttps://host2https://user2:pass5@host5:50?key1=value4&key1=value1&key2=value1&key2=value1https://user4:pass5@host1/?key1=value2&key1=value1http://host4:40/folder?key4=value3&key4=value2http://host1:40ftps://host3:30/ftps://host1/folder/?key4=value1&key1=value4http://user1:pass1@host1:10/folder/?key2=value2&key2=value3&key3=value4http://host3/folder?key2=value2ftps://user4:pass3@host3:50/?key1=value4ftp://host2/folder//folderftp://user2:pass4@host4:40/folder?key3=value2&key2=value1&key2=value2&key4=value3&key3=value3&key3=value1ftps://user4:pass5@host4:50?key4=value2https://host3:10ftp://user1:pass3@host3:10/folder/ftps://host4:30/ftp://user4:pass2@host1/folder/?key3=value2&key2=value4&key1=value3&key3=value2https://host2/folder?key3=value3&key4=value4&key2=value2ftp://host2:50/?key2=value4&key2=value4&key4=value1&key2=value2&key2=value3&key4=value1ftps://user2:pass4@host2/ftps://host3:40/ftps://user4:pass5@host2/ftp://host2:10/?key3=value3&key4=value1http://host2/folder/?key3=value1&key2=value4https://host5/folder?key4=value2https://user3:pass4@host1:20ftp://user3:pass3@host5/https://user1:pass4@host5/https://user3:pass2@host1/folder//ftps://host5:30?key1=value1&key2=value3&key3=value2&key2=value3&key4=value2&key2=value3ftps://user2:pass5@host3:30?key3=value2ftps://host4:10/?key1=value1&key4=value3https://host2:30https://host5:40/folderhttp://user2:pass4@host5:50/folderftp://user5:pass1@host3:50?key3=value2&key1=value4ftp://host1/folder//folder'''.strip().split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: urlparse_grammar = accio_grammar('urlparse.py', VARS['urlparse_src'], urlparse_samples)Mimid_t['urlparse.py'] = t.runtimexxxxxxxxxxsave_grammar(urlparse_grammar, 'mimid', 'urlparse')xxxxxxxxxxif 'urlparse' in CHECK: result = check_precision('urlparse.py', urlparse_grammar) Mimid_p['urlparse.py'] = result print(result)xxxxxxxxxximport subjects.urlparsexxxxxxxxxxif 'urlparse' in CHECK: result = check_recall(urlparse_golden, urlparse_grammar, subjects.urlparse.main) Mimid_r['urlparse.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_urlparse_grammar_t = recover_grammar_with_taints('urlparse.py', VARS['urlparse_src'], urlparse_samples)Autogram_t['urlparse.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_urlparse_grammar_t, 'autogram_t', 'urlparse')xxxxxxxxxxif 'urlparse' in CHECK: result = check_precision('urlparse.py', autogram_urlparse_grammar_t) Autogram_p['urlparse.py'] = result print(result)xxxxxxxxxxif 'urlparse' in CHECK: result = check_recall(urlparse_golden, autogram_urlparse_grammar_t, subjects.urlparse.main) Autogram_r['urlparse.py'] = result print(result)xxxxxxxxxxnetrc_golden = { "<START>": [ "<entries>" ], "<entries>": [ "<entry><whitespace><entries>", "<entry>" ], "<entry>": [ "machine<whitespace><mvalue><whitespace><fills>", "default<whitespace<whitespace><fills>" ], "<whitespace>": [ " " ], "<mvalue>": [ "m1", "m2", "m3" ], "<accvalue>": [ "a1", "a2", "a3" ], "<uservalue>": [ "u1", "u2", "u3" ], "<passvalue>": [ "pwd1", "pwd2", "pwd3" ], "<lvalue>": [ "l1", "l2", "l3" ], "<fills>": [ "<fill>", "<fill><whitespace><fills>" ], "<fill>": [ "account<whitespace><accvalue>", "username<whitespace><uservalue>", "password<whitespace><passvalue>", "login<whitespace><lvalue>" ]}xxxxxxxxxxnetrc_samples = [i.strip().replace('\n', ' ') for i in ['''machine m1 login u1 password pwd1''','''machine m2 login u1 password pwd2''','''default login u1 password pwd1''','''machine m1 login u2 password pwd1''','''machine m2 login u2 password pwd2 machine m1 login l1 password pwd1''','''machine m1 login u1 password pwd1 machine m2 login l2 password pwd2''','''machine m2 password pwd2 login u2''','''machine m1 password pwd1 login u1''','''machine m2 login u2 password pwd1''','''default login u2 password pwd3''','''machine m2 login u2 password pwd1 machine m3 login u3 password pwd1 machine m1 login u1 password pwd2''','''machine m2 login u2 password pwd3machine m1 login u1 password pwd1''','''default login u1 password pwd3machine m2 login u1 password pwd1''','''machine m1 login l1 password p1machine m2 login l2 password p2default login m1 password p1''']]As with `urlparse`, we had to use a restricted set of keywords with _netrc_. The below words are produced from fuzzing the golden grammar, captured here for deterministic reproduction.As with urlparse, we had to use a restricted set of keywords with netrc. The below words are produced from fuzzing the golden grammar, captured here for deterministic reproduction.
xxxxxxxxxxnetrc_samples = [i.strip() for i in '''machine m1 password pwd3 login l3machine m1 login l3 account a3 login l1 password pwd2machine m2 password pwd2machine m2 password pwd2 account a2machine m2 password pwd3machine m2 password pwd1machine m1 login l3 password pwd1machine m2 password pwd3machine m1 password pwd2 account a1 account a2machine m2 password pwd3machine m2 account a1 password pwd3machine m3 login l3 account a2 password pwd3machine m2 password pwd2 login l3 password pwd2 password pwd2machine m3 password pwd2 login l3machine m3 login l3 account a3 account a2 password pwd3machine m1 password pwd2machine m2 account a3 password pwd3machine m3 password pwd2machine m3 password pwd1 account a1machine m2 password pwd1machine m1 account a1 password pwd1machine m2 login l1 login l2 account a2 login l3 password pwd2 password pwd2 password pwd2machine m3 account a3 login l3 account a1 password pwd3machine m1 password pwd1machine m2 password pwd3machine m2 password pwd3machine m1 account a1 password pwd1 account a1 password pwd3machine m3 password pwd3machine m3 password pwd2machine m2 account a1 account a1 account a2 password pwd2 account a1machine m3 password pwd1 login l2 login l1machine m1 account a3 account a3 password pwd1 machine m3 password pwd2machine m1 login l1 password pwd1machine m3 password pwd2 login l1 machine m1 password pwd2machine m3 account a2 password pwd1machine m1 password pwd3machine m3 login l2 account a2 password pwd2machine m2 password pwd3 machine m2 account a1 login l3 password pwd3 password pwd2machine m1 password pwd2machine m1 password pwd2machine m1 password pwd2machine m2 password pwd3 password pwd2machine m2 login l1 password pwd1 account a1machine m3 password pwd1machine m2 password pwd3 password pwd1machine m1 password pwd3 password pwd3 password pwd1machine m2 password pwd1 password pwd1machine m2 login l2 account a3 password pwd3machine m1 password pwd1machine m1 account a3 password pwd3 account a2 password pwd2 account a3 account a3 account a3machine m3 password pwd3 password pwd3 machine m2 password pwd3machine m2 password pwd2 login l2 login l1machine m1 login l3 password pwd2machine m2 login l2 password pwd1machine m2 account a3 password pwd2machine m1 account a2 password pwd1machine m3 login l1 password pwd2 account a2machine m1 password pwd3machine m3 password pwd2machine m1 password pwd3 password pwd3 password pwd1 machine m2 password pwd3machine m1 account a2 account a1 login l2 password pwd2machine m1 login l1 password pwd2 password pwd2 login l3machine m2 password pwd1 password pwd2machine m1 password pwd3 account a3machine m1 login l1 login l2 password pwd2machine m1 account a1 password pwd1 login l2machine m2 password pwd1 login l3machine m2 password pwd2 password pwd1 password pwd3machine m1 password pwd1 account a1 account a2 login l1machine m1 password pwd3machine m2 login l3 password pwd3machine m3 login l2 login l2 password pwd1 login l2machine m2 password pwd1machine m1 password pwd1 login l3 account a2 login l3 password pwd1machine m3 password pwd3machine m3 password pwd1 account a1machine m2 login l3 password pwd1 account a3machine m3 password pwd3machine m2 password pwd1machine m1 login l3 password pwd1 password pwd1machine m3 password pwd3machine m2 password pwd2 login l3 login l2 login l1 account a1machine m1 password pwd1machine m2 password pwd2 login l3machine m2 password pwd2machine m2 password pwd1machine m3 password pwd3machine m1 password pwd1machine m2 account a3 password pwd1machine m2 login l1 password pwd3machine m3 password pwd2 login l1 machine m2 password pwd1machine m2 login l2 account a2 password pwd1 login l2 account a1machine m1 password pwd2machine m3 login l1 password pwd1machine m3 account a2 password pwd2machine m2 login l1 password pwd3 login l2 account a2machine m3 account a1 password pwd2machine m3 login l3 login l3 password pwd1 password pwd1machine m3 password pwd2 password pwd2 password pwd2 account a2machine m3 password pwd1'''.strip().split('\n') if i.strip()]xxxxxxxxxx%%timewith timeit() as t: netrc_grammar = accio_grammar('netrc.py', VARS['netrc_src'], netrc_samples)Mimid_t['netrc.py'] = t.runtimexxxxxxxxxxsave_grammar(netrc_grammar, 'mimid', 'netrc')xxxxxxxxxxif 'netrc' in CHECK: result = check_precision('netrc.py', netrc_grammar) Mimid_p['netrc.py'] = result print(result)xxxxxxxxxx!cp build/mylex.py .!cp build/myio.py .xxxxxxxxxximport subjects.netrcxxxxxxxxxxif 'netrc' in CHECK: result = check_recall(netrc_golden, netrc_grammar, subjects.netrc.main) Mimid_r['netrc.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_netrc_grammar_t = recover_grammar_with_taints('netrc.py', VARS['netrc_src'], netrc_samples)Autogram_t['netrc.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_netrc_grammar_t, 'autogram_t', 'netrc')xxxxxxxxxxif 'netrc' in CHECK: result = check_precision('netrc.py', autogram_netrc_grammar_t) Autogram_p['netrc.py'] = result print(result)xxxxxxxxxxif 'netrc' in CHECK: result = check_recall(netrc_golden, autogram_netrc_grammar_t, subjects.netrc.main) Autogram_r['netrc.py'] = result print(result)This is done through `json.tar.gz`This is done through json.tar.gz
xxxxxxxxxx# json samplesjson_samples = [i.strip().replace('\n', ' ') for i in ['''{"abcd":[], "efgh":{"y":[], "pqrstuv": null, "p": "", "q":"" , "r": "" , "float1": 1.0, "float2":1.0, "float3":1.0 , "float4": 1.0 , "_124": {"wx" : null, "zzyym!!2@@39": [1.1, 2452, 398, {"x":[[4,53,6,[7 ,8,90 ],10]]}]} } }''','''{"mykey1": [1, 2, 3], "mykey2": null, "mykey":"'`:{}<>&%[]\\\\^~|$'"}''','''{"emptya": [], "emptyh": {}, "emptystr":"", "null":null}''', '''[ "JSON Test Pattern pass1", {"object with 1 member":["array with 1 element"]}, {}, [], -42, true, false, null, { "integer": 1234567890, "real": -9876.543210, "e": 0.123456789e-12, "E": 1.234567890E+34, "": 23456789012E66, "zero": 0, "one": 1, "space": " ", "quote": "\\"", "backslash": "\\\\", "controls": "\\b\\f\\n\\r\\t", "slash": "/ & \\/", "alpha": "abcdefghijklmnopqrstuvwyz", "ALPHA": "ABCDEFGHIJKLMNOPQRSTUVWYZ", "digit": "0123456789", "0123456789": "digit", "special": "`1~!@#$%^&*()_+-={':[,]}|;.</>?", "true": true, "false": false, "null": null, "array":[ ], "object":{ }, "address": "50 St. James Street", "url": "http://www.JSON.org/", "comment": "// /* <!-- --", "# -- --> */": " ", " s p a c e d " :[1,2 , 3,4 , 5 , 6 ,7 ],"compact":[1,2,3,4,5,6,7], "jsontext": "{\\"object with 1 member\\":[\\"array with 1 element\\"]}", "quotes": "" %22 0x22 034 "", "\\/\\\\\\"\\b\\f\\n\\r\\t`1~!@#$%^&*()_+-=[]{}|;:',./<>?": "A key can be any string" }, 0.5 ,98.6,99.44,1066,1e1,0.1e1,1e-1,1e00,2e+00,2e-00,"rosebud"]''', '''{"menu": { "id": "file", "value": "File", "popup": { "menuitem": [ {"value": "New", "onclick": "CreateNewDoc()"}, {"value": "Open", "onclick": "OpenDoc()"}, {"value": "Close", "onclick": "CloseDoc()"} ] } }}''', '''{ "XMDEwOlJlcG9zaXRvcnkxODQ2MjA4ODQ=": "-----BEGIN PGP SIGNATURE-----\n\niQIzBAABAQAdFiEESn/54jMNIrGSE6Tp6cQjvhfv7nAFAlnT71cACgkQ6cQjvhfv\n7nCWwA//XVqBKWO0zF+ bZl6pggvky3Oc2j1pNFuRWZ29LXpNuD5WUGXGG209B0hI\nDkmcGk19ZKUTnEUJV2Xd0R7AW01S/YSub7OYcgBkI7qUE13FVHN5ln1KvH2all2n\n2+JCV1HcJLEoTjqIFZSSu/sMdhkLQ9/NsmMAzpf/ iIM0nQOyU4YRex9eD1bYj6nA\nOQPIDdAuaTQj1gFPHYLzM4zJnCqGdRlg0sOM/zC5apBNzIwlgREatOYQSCfCKV7k\nnrU34X8b9BzQaUx48Qa+Dmfn5KQ8dl27RNeWAqlkuWyv3pUauH9UeYW+KyuJeMkU\n+ NyHgAsWFaCFl23kCHThbLStMZOYEnGagrd0hnm1TPS4GJkV4wfYMwnI4KuSlHKB\njHl3Js9vNzEUQipQJbgCgTiWvRJoK3ENwBTMVkKHaqT4x9U4Jk/ XZB6Q8MA09ezJ\n3QgiTjTAGcum9E9QiJqMYdWQPWkaBIRRz5cET6HPB48YNXAAUsfmuYsGrnVLYbG+ \nUpC6I97VybYHTy2O9XSGoaLeMI9CsFn38ycAxxbWagk5mhclNTP5mezIq6wKSwmr\nX11FW3n1J23fWZn5HJMBsRnUCgzqzX3871IqLYHqRJ/bpZ4h20RhTyPj5c/z7QXp\neSakNQMfbbMcljkha+ ZMuVQX1K9aRlVqbmv3ZMWh+OijLYVU2bc=\n=5Io4\n-----END PGP SIGNATURE-----\n"}''', '''{"widget": { "debug": "on", "window": { "title": "Sample Konfabulator Widget", "name": "main_window", "width": 500, "height": 500 }, "image": { "src": "Images/Sun.png", "name": "sun1", "hOffset": 250, "vOffset": 250, "alignment": "center" }, "text": { "data": "Click Here", "size": 36, "style": "bold", "name": "text1", "hOffset": 250, "vOffset": 100, "alignment": "center", "onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;" } }}''','''{ "fruit": "Apple", "size": "Large", "color": "Red", "product": "Jam"}''','''{"menu": { "header": "SVG Viewer", "items": [ {"id": "Open"}, {"id": "OpenNew", "label": "Open New"}, null, {"id": "ZoomIn", "label": "Zoom In"}, {"id": "ZoomOut", "label": "Zoom Out"}, {"id": "OriginalView", "label": "Original View"}, null, {"id": "Quality"}, {"id": "Pause"}, {"id": "Mute"}, null, {"id": "Find", "label": "Find..."}, {"id": "FindAgain", "label": "Find Again"}, {"id": "Copy"}, {"id": "CopyAgain", "label": "Copy Again"}, {"id": "CopySVG", "label": "Copy SVG"}, {"id": "ViewSVG", "label": "View SVG"}, {"id": "ViewSource", "label": "View Source"}, {"id": "SaveAs", "label": "Save As"}, null, {"id": "Help"}, {"id": "About", "label": "About Adobe CVG Viewer..."} ] }}''','''{ "quiz": { "sport": { "q1": { "question": "Which one is correct team name in NBA?", "options": [ "New York Bulls", "Los Angeles Kings", "Golden State Warriros", "Huston Rocket" ], "answer": "Huston Rocket" } }, "maths": { "q1": { "question": "5 + 7 = ?", "options": [ "10", "11", "12", "13" ], "answer": "12" }, "q2": { "question": "12 - 8 = ?", "options": [ "1", "2", "3", "4" ], "answer": "4" } } }}''','''{ "colors": [ { "color": "black", "category": "hue", "type": "primary", "code": { "rgba": [255,255,255,1], "hex": "#000" } }, { "color": "white", "category": "value", "code": { "rgba": [0,0,0,1], "hex": "#FFF" } }, { "color": "red", "category": "hue", "type": "primary", "code": { "rgba": [255,0,0,1], "hex": "#FF0" } }, { "color": "blue", "category": "hue", "type": "primary", "code": { "rgba": [0,0,255,1], "hex": "#00F" } }, { "color": "yellow", "category": "hue", "type": "primary", "code": { "rgba": [255,255,0,1], "hex": "#FF0" } }, { "color": "green", "category": "hue", "type": "secondary", "code": { "rgba": [0,255,0,1], "hex": "#0F0" } } ]}''','''{ "aliceblue": "#f0f8ff", "antiquewhite": "#faebd7", "aqua": "#00ffff", "aquamarine": "#7fffd4", "azure": "#f0ffff", "beige": "#f5f5dc", "bisque": "#ffe4c4", "black": "#000000", "blanchedalmond": "#ffebcd", "blue": "#0000ff", "blueviolet": "#8a2be2", "brown": "#a52a2a", "majenta": "#ff0ff"}''']]xxxxxxxxxx%%timewith timeit() as t: microjson_grammar = accio_grammar('microjson.py', VARS['microjson_src'], json_samples)Mimid_t['microjson.py'] = t.runtimexxxxxxxxxxsave_grammar(microjson_grammar, 'mimid', 'microjson')xxxxxxxxxxif 'microjson' in CHECK: result = check_precision('microjson.py', microjson_grammar) Mimid_p['microjson.py'] = result print(result)xxxxxxxxxximport subjects.microjsonxxxxxxxxxximport pathlibxxxxxxxxxxdef slurp(fn): with open(fn) as f: s = f.read() return s.replace('\n', ' ').strip()xxxxxxxxxxif shutil.which('gzcat'): !gzcat json.tar.gz | tar -xpf -elif shutil.which('zcat'): !zcat json.tar.gz | tar -xpf -else: assert Falsexxxxxxxxxxjson_path = pathlib.Path('recall/json')json_files = [i.as_posix() for i in json_path.glob('**/*.json')]json_samples_2 = [slurp(i) for i in json_files]xxxxxxxxxxdef check_recall_samples(samples, my_grammar, validator, log=False): n_max = len(samples) ie = IterativeEarleyParser(my_grammar, start_symbol='<START>') my_samples = list(samples) count = 0 while my_samples: src, *my_samples = my_samples try: validator(src) try: # JSON files are much larger because they are from real world for tree in ie.parse(src): count += 1 break if log: print('+', repr(src), count, file=sys.stderr) except: if log: print('-', repr(src), file=sys.stderr) except: pass return (count, n_max)xxxxxxxxxxif 'microjson' in CHECK: result = check_recall_samples(json_samples_2, microjson_grammar, subjects.microjson.main) Mimid_r['microjson.py'] = result print(result)xxxxxxxxxx%%timewith timeit() as t: autogram_microjson_grammar_t = recover_grammar_with_taints('microjson.py', VARS['microjson_src'], json_samples)Autogram_t['microjson.py'] = t.runtimexxxxxxxxxxsave_grammar(autogram_microjson_grammar_t, 'autogram_t', 'microjson')xxxxxxxxxxif 'microjson' in CHECK: result = check_precision('microjson.py', autogram_microjson_grammar_t) Autogram_p['microjson.py'] = result print(result)xxxxxxxxxxif 'microjson' in CHECK: result = check_recall_samples(json_samples_2, autogram_microjson_grammar_t, subjects.microjson.main) Autogram_r['microjson.py'] = result print(result)Note that we found and fixed a bug in the Information flow chapter of the fuzzingbook, which was causing Autogram to fail (See `flatten` and `ostr_new` in `recover_grammar_with_taints`). This caused the precision numbers of Autogram to improve. However, please see the grammars generated. They are still enumerations.Note that we found and fixed a bug in the Information flow chapter of the fuzzingbook, which was causing Autogram to fail (See flatten and ostr_new in recover_grammar_with_taints). This caused the precision numbers of Autogram to improve. However, please see the grammars generated. They are still enumerations.
xxxxxxxxxxfrom IPython.display import HTML, displayxxxxxxxxxxdef show_table(keys, autogram, mimid, title): keys = [k for k in keys if k in autogram and k in mimid and autogram[k] and mimid[k]] tbl = ['<tr>%s</tr>' % ''.join(["<th>%s</th>" % k for k in ['<b>%s</b>' % title,'Autogram', 'Mimid']])] for k in keys: h_c = "<td>%s</td>" % k a_c = "<td>%s</td>" % autogram.get(k,('',0))[0] m_c = "<td>%s</td>" % mimid.get(k,('',0))[0] tbl.append('<tr>%s</tr>' % ''.join([h_c, a_c, m_c])) return display(HTML('<table>%s</table>' % '\n'.join(tbl)))xxxxxxxxxxdef to_sec(hm): return {k:((hm[k][1]).seconds, ' ') for k in hm if hm[k]}xxxxxxxxxxAutogram_txxxxxxxxxxMimid_txxxxxxxxxxshow_table(Autogram_t.keys(), to_sec(Autogram_t), to_sec(Mimid_t), 'Timing')### Table III (Precision)How many inputs we generate using our inferred grammar is valid? (accepted by the subject program?)Note that the paper reports precision per 100 inputs. We have increased the count to 1000.How many inputs we generate using our inferred grammar is valid? (accepted by the subject program?)
Note that the paper reports precision per 100 inputs. We have increased the count to 1000.
xxxxxxxxxxAutogram_pxxxxxxxxxxMimid_pxxxxxxxxxxshow_table(Autogram_p.keys(), Autogram_p, Mimid_p, 'Precision')### Table IV (Recall)How many *valid* inputs generated by the golden grammar or collected externally are parsable by a parser using our grammar?Note that the recall is reported per 100 inputs in paper. We have increased the count to 1000. For Microjson, the recall numbers are based on 100 realworld documents. These are available in json.tar.gz that is bundled along with this notebook.How many valid inputs generated by the golden grammar or collected externally are parsable by a parser using our grammar?
Note that the recall is reported per 100 inputs in paper. We have increased the count to 1000. For Microjson, the recall numbers are based on 100 realworld documents. These are available in json.tar.gz that is bundled along with this notebook.
xxxxxxxxxxAutogram_rxxxxxxxxxxMimid_rxxxxxxxxxxshow_table(Autogram_p.keys(), Autogram_r, Mimid_r, 'Recall')xxxxxxxxxx%%var calc_rec_srcimport stringdef is_digit(i): return i in list(string.digits)def parse_num(s,i): while s[i:] and is_digit(s[i]): i = i +1 return idef parse_paren(s, i): assert s[i] == '(' i = parse_expr(s, i+1) if s[i:] == '': raise Exception(s, i) assert s[i] == ')' return i+1def parse_expr(s, i = 0): expr = [] is_op = True while s[i:] != '': c = s[i] if c in list(string.digits): if not is_op: raise Exception(s,i) i = parse_num(s,i) is_op = False elif c in ['+', '-', '*', '/']: if is_op: raise Exception(s,i) is_op = True i = i + 1 elif c == '(': if not is_op: raise Exception(s,i) i = parse_paren(s, i) is_op = False elif c == ')': break else: raise Exception(s,i) if is_op: raise Exception(s,i) return idef main(arg): parse_expr(arg)xxxxxxxxxxcalc_rec_grammar = accio_grammar('cal.py', VARS['calc_rec_src'], calc_samples)xxxxxxxxxxcalc_rec_grammarxxxxxxxxxx%%var myparsec_src# From https://github.com/xmonader/pyparsecfrom functools import reduceimport string flatten = lambda l: [item for sublist in l for item in (sublist if isinstance(sublist, list) else [sublist] )]class Maybe: passclass Just(Maybe): def __init__(self, val): self.val = val def __str__(self): return "<Just %s>"%str(self.val) class Nothing(Maybe): _instance = None def __new__(class_, *args, **kwargs): if not isinstance(class_._instance, class_): class_._instance = object.__new__(class_, *args, **kwargs) return class_._instance def __str__(self): return "<Nothing>"class Either: passclass Left: def __init__(self, errmsg): self.errmsg = errmsg def __str__(self): return "(Left %s)"%self.errmsg __repr__ = __str__ def map(self, f): return self class Right: def __init__(self, val): self.val = val def unwrap(self): return self.val def val0(self): if isinstance(self.val[0], list): return flatten(self.val[0]) else: return [self.val[0]] def __str__(self): return "(Right %s)"% str(self.val) __repr__ = __str__ def map(self, f): return Right( (f(self.val0), self.val[1])) class Parser: def __init__(self, f, tag=''): self.f = f self._suppressed = False self.tag = tag def parse(self, *args, **kwargs): return self.f(*args, **kwargs) __call__ = parse def __rshift__(self, rparser): return and_then(self, rparser) def __lshift__(self, rparser): return and_then(self, rparser) def __or__(self, rparser): return or_else(self, rparser) def map(self, transformer): return Parser(lambda *args, **kwargs: self.f(*args, **kwargs).map(transformer), self.tag) def __mul__(self, times): return n(self, times) set_action = map def suppress(self): self._suppressed = True return selfdef pure(x): def curried(s): return Right((x, s)) return Parser(curried, 'pure')def ap(p1, p2): def curried(s): res = p2(s) return p1(*res.val[0]) return currieddef compose(p1, p2): def newf(*args, **kwargs): return p2(p1(*args, **kwargs)) return newfdef run_parser(p, inp): return p(inp)def _isokval(v): if isinstance(v, str) and not v.strip(): return False if isinstance(v, list) and v and v[0] == "": return False return Truedef and_then(p1, p2): def curried(s): res1 = p1(s) if isinstance(res1, Left): return res1 else: res2 = p2(res1.val[1]) # parse remaining chars. if isinstance(res2, Right): v1 = res1.val0 v2 = res2.val0 vs = [] if not p1._suppressed and _isokval(v1): vs += v1 if not p2._suppressed and _isokval(v2): vs += v2 return Right( (vs, res2.val[1])) return res2 return Parser(curried, 'and_then')def n(parser, count): def curried(s): fullparsed = "" for i in range(count): res = parser(s) if isinstance(res, Left): return res else: parsed, remaining = res.unwrap() s = remaining fullparsed += parsed return Right((fullparsed, s)) return Parser(curried, 'n')def or_else(p1, p2): def curried(s): res = p1(s) if isinstance(res, Right): return res else: res = p2(s) if isinstance(res, Right): return res else: return Left("Failed at both") return Parser(curried, 'or_else')def char(c): def curried(s): if not s: msg = "S is empty" return Left(msg) else: if s[0] == c: return Right((c, s[1:]) ) else: return Left("Expecting '%s' and found '%s'"%(c, s[0])) return Parser(curried, 'char')foldl = reducedef choice(parsers): return foldl(or_else, parsers)def any_of(chars): return choice(list(map(char, chars)))def parse_string(s): return foldl(and_then, list(map(char, list(s)))).map(lambda l: "".join(l))def until_seq(seq): def curried(s): if not s: msg = "S is empty" return Left(msg) else: if seq == s[:len(seq)]: return Right(("", s)) else: return Left("Expecting '%s' and found '%s'"%(seq, s[:len(seq)])) return Parser(curried, 'until_seq')def until(p): def curried(s): res = p(s) if isinstance(res, Left): return res else: return Right(("", s)) return Parser(curried, 'until')chars = parse_stringdef parse_zero_or_more(parser, inp): #zero or more res = parser(inp) if isinstance(res, Left): return "", inp else: firstval, restinpafterfirst = res.val subseqvals, remaining = parse_zero_or_more(parser, restinpafterfirst) values = firstval if subseqvals: if isinstance(firstval, str): values = firstval+subseqvals elif isinstance(firstval, list): values = firstval+ ([subseqvals] if isinstance(subseqvals, str) else subseqvals) return values, remainingdef many(parser): def curried(s): return Right(parse_zero_or_more(parser,s)) return Parser(curried, 'many')def many1(parser): def curried(s): res = run_parser(parser, s) if isinstance(res, Left): return res else: return run_parser(many(parser), s) return Parser(curried, 'many1')def optionally(parser): noneparser = Parser(lambda x: Right( (Nothing(), ""))) return or_else(parser, noneparser)def sep_by1(sep, parser): sep_then_parser = sep >> parser return parser >> many(sep_then_parser)def sep_by(sep, parser): return (sep_by1(sep, parser) | Parser(lambda x: Right( ([], "")), 'sep_by'))def forward(parsergeneratorfn): def curried(s): return parsergeneratorfn()(s) return curriedletter = any_of(string.ascii_letters)letter.tag = 'letter'lletter = any_of(string.ascii_lowercase)lletter.tag = 'lletter'uletter = any_of(string.ascii_uppercase)uletter.tag = 'uletter'digit = any_of(string.digits)digit.tag = 'digit'digits = many1(digit)digits.tag = 'digits'whitespace = any_of(string.whitespace)whitespace.tag = 'whitespace'ws = whitespace.suppress()ws.tag = 'ws'letters = many1(letter)letters.tag = 'letters'word = lettersword.tag = 'word'alphanumword = many(letter >> (letters|digits))alphanumword.tag = 'alphanumword'num_as_int = digits.map(lambda l: int("".join(l)))num_as_int.tag = 'num_as_int'between = lambda p1, p2 , p3 : p1 >> p2 >> p3surrounded_by = lambda surparser, contentparser: surparser >> contentparser >> surparserquotedword = surrounded_by( (char('"')|char("'")).suppress() , word)quotedword.tag = 'quotedword'option = optionallyoption.tag = 'optionally'# commasepareted_p = sep_by(char(",").suppress(), many1(word) | many1(digit) | many1(quotedword))commaseparated_of = lambda p: sep_by(char(",").suppress(), many(p))xxxxxxxxxxwith open('build/myparsec.py', 'w+') as f: src = rewrite(VARS['myparsec_src'], original='myparsec.py') print(src, file=f)xxxxxxxxxx%%var parsec_srcimport stringimport jsonimport sysimport myparsec as pyparsecalphap = pyparsec.char('a')alphap.tag = 'alphap'eqp = pyparsec.char('=')eqp.tag = 'eqp'digitp = pyparsec.digitsdigitp.tag = 'digitp'abcparser = alphap >> eqp >> digitpabcparser.tag = 'abcparser'def main(arg): abcparser.parse(arg)xxxxxxxxxxparsec_samples = ['a=0']xxxxxxxxxxdef accio_tree(fname, src, samples, restrict=True): program_src[fname] = src with open('subjects/%s' % fname, 'w+') as f: print(src, file=f) resrc = rewrite(src, fname) if restrict: resrc = resrc.replace('restrict = {\'files\': [sys.argv[0]]}', 'restrict = {}') with open('build/%s' % fname, 'w+') as f: print(resrc, file=f) os.makedirs('samples/%s' % fname, exist_ok=True) sample_files = {("samples/%s/%d.csv"%(fname,i)):s for i,s in enumerate(samples)} for k in sample_files: with open(k, 'w+') as f: print(sample_files[k], file=f) call_trace = [] for i in sample_files: my_tree = do(["python", "./build/%s" % fname, i]).stdout call_trace.append(json.loads(my_tree)[0]) mined_tree = miner(call_trace) generalized_tree = generalize_iter(mined_tree) return generalized_treexxxxxxxxxxparsec_trees = accio_tree('parsec.py', VARS['parsec_src'], parsec_samples)xxxxxxxxxxzoom(display_tree(parsec_trees[0]['tree'], extract_node=extract_node_o))xxxxxxxxxx%%var peg_srcimport reRE_NONTERMINAL = re.compile(r'(<[^<> ]*>)')def canonical(grammar, letters=False): def split(expansion): if isinstance(expansion, tuple): expansion = expansion[0] return [token for token in re.split(RE_NONTERMINAL, expansion) if token] def tokenize(word): return list(word) if letters else [word] def canonical_expr(expression): return [token for word in split(expression) for token in ([word] if word in grammar else tokenize(word))] return {k: [canonical_expr(expression) for expression in alternatives] for k, alternatives in grammar.items()}def crange(character_start, character_end): return [chr(i) for i in range(ord(character_start), ord(character_end) + 1)]def unify_key(grammar, key, text, at=0): if key not in grammar: if text[at:].startswith(key): return at + len(key), (key, []) else: return at, None for rule in grammar[key]: to, res = unify_rule(grammar, rule, text, at) if res: return (to, (key, res)) return 0, Nonedef unify_rule(grammar, rule, text, at): results = [] for token in rule: at, res = unify_key(grammar, token, text, at) if res is None: return at, None results.append(res) return at, resultsimport stringVAR_GRAMMAR = { '<start>': ['<assignment>'], '<assignment>': ['<identifier>=<expr>'], '<identifier>': ['<word>'], '<word>': ['<alpha><word>', '<alpha>'], '<alpha>': list(string.ascii_letters), '<expr>': ['<term>+<expr>', '<term>-<expr>', '<term>'], '<term>': ['<factor>*<term>', '<factor>/<term>', '<factor>'], '<factor>': ['+<factor>', '-<factor>', '(<expr>)', '<identifier>', '<number>'], '<number>': ['<integer>.<integer>', '<integer>'], '<integer>': ['<digit><integer>', '<digit>'], '<digit>': crange('0', '9')}def main(arg): C_VG = canonical(VAR_GRAMMAR) unify_key(C_VG, '<start>', arg)xxxxxxxxxxpeg_samples = [ 'a=0',]xxxxxxxxxxpeg_trees = accio_tree('peg.py', VARS['peg_src'], peg_samples, False)xxxxxxxxxxzoom(display_tree(peg_trees[0]['tree'], extract_node=extract_node_o))